UBERF-7690: Operation log support + fixes (#6337)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-08-16 13:54:23 +07:00 committed by GitHub
parent cab8ded85a
commit 2caa8590f9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 691 additions and 390 deletions

View File

@ -1,7 +1,16 @@
// Basic performance metrics suite.
import { generateId } from '../utils'
import { childMetrics, measure, newMetrics } from './metrics'
import { FullParamsType, MeasureContext, MeasureLogger, Metrics, ParamsType } from './types'
import {
FullParamsType,
MeasureContext,
MeasureLogger,
Metrics,
ParamsType,
type OperationLog,
type OperationLogEntry
} from './types'
/**
* @public
@ -98,21 +107,29 @@ export class MeasureMetricsContext implements MeasureContext {
)
}
async with<T>(
with<T>(
name: string,
params: ParamsType,
op: (ctx: MeasureContext) => T | Promise<T>,
fullParams?: ParamsType | (() => FullParamsType)
): Promise<T> {
const c = this.newChild(name, params, fullParams, this.logger)
let needFinally = true
try {
let value = op(c)
if (value instanceof Promise) {
value = await value
const value = op(c)
if (value != null && value instanceof Promise) {
needFinally = false
void value.finally(() => {
c.end()
})
return value
} else {
return Promise.resolve(value)
}
return value
} finally {
c.end()
if (needFinally) {
c.end()
}
}
}
@ -176,3 +193,96 @@ export function withContext (name: string, params: ParamsType = {}): any {
return descriptor
}
}
let operationProfiling = false
export function setOperationLogProfiling (value: boolean): void {
operationProfiling = value
}
export function registerOperationLog (ctx: MeasureContext): { opLogMetrics?: Metrics, op?: OperationLog } {
if (!operationProfiling) {
return {}
}
const op: OperationLog = { start: Date.now(), ops: [], end: -1 }
let opLogMetrics: Metrics | undefined
ctx.id = generateId()
if (ctx.metrics !== undefined) {
if (ctx.metrics.opLog === undefined) {
ctx.metrics.opLog = {}
}
ctx.metrics.opLog[ctx.id] = op
opLogMetrics = ctx.metrics
}
return { opLogMetrics, op }
}
export function updateOperationLog (opLogMetrics: Metrics | undefined, op: OperationLog | undefined): void {
if (!operationProfiling) {
return
}
if (op !== undefined) {
op.end = Date.now()
}
// We should keep only longest one entry
if (opLogMetrics?.opLog !== undefined) {
const entries = Object.entries(opLogMetrics.opLog)
const incomplete = entries.filter((it) => it[1].end === -1)
const complete = entries.filter((it) => it[1].end !== -1)
complete.sort((a, b) => a[1].start - b[1].start)
if (complete.length > 30) {
complete.splice(0, complete.length - 30)
}
opLogMetrics.opLog = Object.fromEntries(incomplete.concat(complete))
}
}
export function addOperation<T> (
ctx: MeasureContext,
name: string,
params: ParamsType,
op: (ctx: MeasureContext) => Promise<T>,
fullParams?: FullParamsType
): Promise<T> {
if (!operationProfiling) {
return op(ctx)
}
let opEntry: OperationLogEntry | undefined
let p: MeasureContext | undefined = ctx
let opLogMetrics: Metrics | undefined
let id: string | undefined
while (p !== undefined) {
if (p.metrics?.opLog !== undefined) {
opLogMetrics = p.metrics
}
if (id === undefined && p.id !== undefined) {
id = p.id
}
p = p.parent
}
const opLog = id !== undefined ? opLogMetrics?.opLog?.[id] : undefined
if (opLog !== undefined) {
opEntry = {
op: name,
start: Date.now(),
params: {},
end: -1
}
}
const result = op(ctx)
if (opEntry !== undefined && opLog !== undefined) {
void result.finally(() => {
if (opEntry !== undefined && opLog !== undefined) {
opEntry.end = Date.now()
opEntry.params = { ...params, ...(typeof fullParams === 'function' ? fullParams() : fullParams) }
opLog.ops.push(opEntry)
}
})
}
return result
}

View File

@ -161,7 +161,8 @@ export function metricsAggregate (m: Metrics, limit: number = -1): Metrics {
params: m.params,
value: sumVal,
topResult: m.topResult,
namedParams: m.namedParams
namedParams: m.namedParams,
opLog: m.opLog
}
}

View File

@ -25,6 +25,18 @@ export interface MetricsData {
}[]
}
export interface OperationLogEntry {
op: string
params: ParamsType
start: number
end: number
}
export interface OperationLog {
ops: OperationLogEntry[]
start: number
end: number
}
/**
* @public
*/
@ -32,6 +44,8 @@ export interface Metrics extends MetricsData {
namedParams: ParamsType
params: Record<string, Record<string, MetricsData>>
measurements: Record<string, Metrics>
opLog?: Record<string, OperationLog>
}
/**
@ -53,9 +67,12 @@ export interface MeasureLogger {
* @public
*/
export interface MeasureContext {
id?: string
// Create a child metrics context
newChild: (name: string, params: ParamsType, fullParams?: FullParamsType, logger?: MeasureLogger) => MeasureContext
metrics?: Metrics
with: <T>(
name: string,
params: ParamsType,

View File

@ -48,6 +48,9 @@ export interface SessionOperationContext {
op: (ctx: SessionOperationContext) => T | Promise<T>,
fullParams?: FullParamsType
) => Promise<T>
contextCache: Map<string, any>
removedMap: Map<Ref<Doc>, Doc>
}
/**

View File

@ -312,6 +312,12 @@ export class LiveQuery implements WithTx, Client {
return this.clone(q.result)[0] as WithLookup<T>
}
private optionsCompare (opt1?: FindOptions<Doc>, opt2?: FindOptions<Doc>): boolean {
const { ctx: _1, ..._opt1 } = (opt1 ?? {}) as any
const { ctx: _2, ..._opt2 } = (opt2 ?? {}) as any
return deepEqual(_opt1, _opt2)
}
private findQuery<T extends Doc>(
_class: Ref<Class<T>>,
query: DocumentQuery<T>,
@ -319,8 +325,9 @@ export class LiveQuery implements WithTx, Client {
): Query | undefined {
const queries = this.queries.get(_class)
if (queries === undefined) return
for (const q of queries) {
if (!deepEqual(query, q.query) || !deepEqual(options, q.options)) continue
if (!deepEqual(query, q.query) || !this.optionsCompare(options, q.options)) continue
return q
}
}

View File

@ -40,7 +40,7 @@
<Button
label={getEmbeddedLabel('*')}
on:click={() => {
showPopup(Params, { params: metrics.namedParams ?? {} })
showPopup(Params, { params: metrics.namedParams ?? {}, opLog: metrics.opLog }, 'full')
}}
kind={'ghost'}
/>

View File

@ -13,20 +13,43 @@
// limitations under the License.
-->
<script lang="ts">
import type { OperationLog, OperationLogEntry } from '@hcengineering/core'
import presentation from '@hcengineering/presentation'
import { Button, FocusHandler, createFocusManager } from '@hcengineering/ui'
import { createEventDispatcher } from 'svelte'
export let params: Record<string, any>
export let opLog: Record<string, OperationLog> | undefined
const dispatch = createEventDispatcher()
const manager = createFocusManager()
function toEntries (entries: OperationLogEntry[]): OperationLogEntry[] {
entries.sort((a, b) => a.start - b.start)
// make relative times
if (entries.length > 0) {
const min = entries[0].start
entries.forEach((it) => {
it.start -= min
it.end -= min
})
}
return entries
}
$: entries = toEntries(
Object.values(opLog ?? {})
.map((it, i) => it.ops.map((q) => ({ ...q, op: `#${i} ${q.op}` })))
.flat()
)
</script>
<FocusHandler {manager} />
<div class="msgbox-container">
<div class="msgbox-container w-full select-text">
<div class="overflow-label fs-title mb-4"></div>
<div class="message no-word-wrap" style:overflow={'auto'}>
{#each Object.entries(params) as kv}
@ -34,6 +57,21 @@
{kv[0]}: {typeof kv[1] === 'object' ? JSON.stringify(kv[1]) : kv[1]}
</div>
{/each}
{#each entries as op, i}
{@const hasOverlap = i > 1 && entries.slice(0, i).some((it) => it.end > op.start)}
{@const hasOverlapDown = i > 0 && entries.slice(i + 1).some((it) => it.start < op.end)}
<div class="flex-row-center select-text" style:background-color={hasOverlap || hasOverlapDown ? 'yellow' : ''}>
{op.op}
{#if hasOverlap}
⬆️
{/if}
{#if hasOverlapDown}
⬇️
{/if}
{JSON.stringify(op.params)} - {op.start} - {op.end} - ({op.end - op.start})
</div>
{/each}
</div>
<div class="footer">
<Button
@ -54,8 +92,9 @@
display: flex;
flex-direction: column;
padding: 2rem 1.75rem 1.75rem;
width: 30rem;
max-width: 40rem;
width: 100%;
max-width: 100%;
overflow: auto;
background: var(--theme-popup-color);
border-radius: 0.5rem;
user-select: none;

View File

@ -8,7 +8,7 @@
"template": "@hcengineering/node-package",
"license": "EPL-2.0",
"scripts": {
"start": "rush bundle --to @hcengineering/pod-server && cross-env NODE_ENV=production ELASTIC_INDEX_NAME=local_storage_index MODEL_VERSION=$(node ../../common/scripts/show_version.js) ACCOUNTS_URL=http://localhost:3000 REKONI_URL=http://localhost:4004 MONGO_URL=mongodb://localhost:27017 ELASTIC_URL=http://localhost:9200 FRONT_URL=http://localhost:8087 UPLOAD_URL=/upload MINIO_ENDPOINT=localhost MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin METRICS_CONSOLE=true SERVER_SECRET=secret node --inspect --enable-source-maps bundle/bundle.js",
"start": "rush bundle --to @hcengineering/pod-server && cross-env NODE_ENV=production ELASTIC_INDEX_NAME=local_storage_index MODEL_VERSION=$(node ../../common/scripts/show_version.js) ACCOUNTS_URL=http://localhost:3000 REKONI_URL=http://localhost:4004 MONGO_URL=mongodb://localhost:27017 ELASTIC_URL=http://localhost:9200 FRONT_URL=http://localhost:8087 UPLOAD_URL=/upload MINIO_ENDPOINT=localhost MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin METRICS_CONSOLE=true SERVER_SECRET=secret OPERATION_PROFILING=false node --inspect --enable-source-maps bundle/bundle.js",
"start-u": "rush bundle --to @hcengineering/pod-server && cp ./node_modules/@hcengineering/uws/lib/*.node ./bundle/ && cross-env NODE_ENV=production SERVER_PROVIDER=uweb ELASTIC_INDEX_NAME=local_storage_index MODEL_VERSION=$(node ../../common/scripts/show_version.js) ACCOUNTS_URL=http://localhost:3000 REKONI_URL=http://localhost:4004 MONGO_URL=mongodb://localhost:27017 ELASTIC_URL=http://localhost:9200 FRONT_URL=http://localhost:8087 UPLOAD_URL=/upload MINIO_ENDPOINT=localhost MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin METRICS_CONSOLE=true SERVER_SECRET=secret node --inspect bundle/bundle.js",
"start-flame": "rush bundle --to @hcengineering/pod-server && cross-env NODE_ENV=production ELASTIC_INDEX_NAME=local_storage_index MODEL_VERSION=$(node ../../common/scripts/show_version.js) ACCOUNTS_URL=http://localhost:3000 REKONI_URL=http://localhost:4004 MONGO_URL=mongodb://localhost:27017 ELASTIC_URL=http://localhost:9200 FRONT_URL=http://localhost:8087 UPLOAD_URL=/upload MINIO_ENDPOINT=localhost MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin METRICS_CONSOLE=true SERVER_SECRET=secret clinic flame --dest ./out -- node --nolazy -r ts-node/register --enable-source-maps src/__start.ts",
"build": "compile",

View File

@ -5,7 +5,7 @@
// Add this to the VERY top of the first file loaded in your app
import { Analytics } from '@hcengineering/analytics'
import contactPlugin from '@hcengineering/contact'
import { MeasureMetricsContext, newMetrics } from '@hcengineering/core'
import { MeasureMetricsContext, newMetrics, setOperationLogProfiling } from '@hcengineering/core'
import notification from '@hcengineering/notification'
import { setMetadata } from '@hcengineering/platform'
import { getMetricsContext, serverConfigFromEnv } from '@hcengineering/server'
@ -39,6 +39,8 @@ getMetricsContext(
)
)
setOperationLogProfiling(process.env.OPERATION_PROFILING === 'true')
const config = serverConfigFromEnv()
const storageConfig: StorageConfiguration = storageConfigFromEnv()

View File

@ -150,7 +150,7 @@ export async function OnUserStatus (tx: Tx, control: TriggerControl): Promise<Tx
}
async function roomJoinHandler (info: ParticipantInfo, control: TriggerControl): Promise<Tx[]> {
const roomInfos = await control.queryFind(love.class.RoomInfo, {})
const roomInfos = await control.queryFind(control.ctx, love.class.RoomInfo, {})
const roomInfo = roomInfos.find((ri) => ri.room === info.room)
if (roomInfo !== undefined) {
roomInfo.persons.push(info.person)
@ -174,7 +174,7 @@ async function roomJoinHandler (info: ParticipantInfo, control: TriggerControl):
async function rejectJoinRequests (info: ParticipantInfo, control: TriggerControl): Promise<Tx[]> {
const res: Tx[] = []
const roomInfos = await control.queryFind(love.class.RoomInfo, {})
const roomInfos = await control.queryFind(control.ctx, love.class.RoomInfo, {})
const oldRoomInfo = roomInfos.find((ri) => ri.persons.includes(info.person))
if (oldRoomInfo !== undefined) {
const restPersons = oldRoomInfo.persons.filter((p) => p !== info.person)
@ -197,7 +197,7 @@ async function rejectJoinRequests (info: ParticipantInfo, control: TriggerContro
async function setDefaultRoomAccess (info: ParticipantInfo, control: TriggerControl): Promise<Tx[]> {
const res: Tx[] = []
const roomInfos = await control.queryFind(love.class.RoomInfo, {})
const roomInfos = await control.queryFind(control.ctx, love.class.RoomInfo, {})
const oldRoomInfo = roomInfos.find((ri) => ri.persons.includes(info.person))
if (oldRoomInfo !== undefined) {
oldRoomInfo.persons = oldRoomInfo.persons.filter((p) => p !== info.person)

View File

@ -146,7 +146,7 @@ export async function isAllowed (
type: BaseNotificationType,
provider: NotificationProvider
): Promise<boolean> {
const providersSettings = await control.queryFind(notification.class.NotificationProviderSetting, {
const providersSettings = await control.queryFind(control.ctx, notification.class.NotificationProviderSetting, {
space: core.space.Workspace
})
const providerSetting = providersSettings.find(
@ -167,7 +167,7 @@ export async function isAllowed (
return false
}
const typesSettings = await control.queryFind(notification.class.NotificationTypeSetting, {
const typesSettings = await control.queryFind(control.ctx, notification.class.NotificationTypeSetting, {
space: core.space.Workspace
})
const setting = typesSettings.find(

View File

@ -1371,7 +1371,9 @@ function wrapPipeline (ctx: MeasureContext, pipeline: Pipeline, wsUrl: Workspace
{ targets: {}, txes: [] },
wsUrl,
null,
false
false,
new Map(),
new Map()
)
return {

View File

@ -154,7 +154,9 @@ class BackupWorker {
{ targets: {}, txes: [] },
wsUrl,
null,
false
false,
new Map(),
new Map()
)
const backupOps = new BackupClientOps(pipeline)

View File

@ -94,7 +94,6 @@ export type DbAdapterHandler = (
domain: Domain,
event: 'add' | 'update' | 'delete' | 'read',
count: number,
time: number,
helper: DomainHelperOperations
) => void
/**

View File

@ -22,7 +22,10 @@ import core, {
DOMAIN_TX,
TxFactory,
TxProcessor,
addOperation,
registerOperationLog,
toFindResult,
updateOperationLog,
type Account,
type AttachedDoc,
type Branding,
@ -39,8 +42,10 @@ import core, {
type Hierarchy,
type LoadModelResponse,
type MeasureContext,
type Metrics,
type Mixin,
type ModelDb,
type OperationLog,
type Ref,
type SearchOptions,
type SearchQuery,
@ -155,7 +160,7 @@ export class TServerStorage implements ServerStorage {
}
}
for (const adapter of this.adapters.values()) {
adapter.on?.((domain, event, count, time, helper) => {
adapter.on?.((domain, event, count, helper) => {
const info = this.getDomainInfo(domain)
const oldDocuments = info.documents
switch (event) {
@ -205,27 +210,27 @@ export class TServerStorage implements ServerStorage {
},
close: async () => {},
findAll: async (_class, query, options) => {
return await metrics.with('query', {}, async (ctx) => {
const results = await this.findAll(ctx, _class, query, options)
return toFindResult(
results.map((v) => {
return this.hierarchy.updateLookupMixin(_class, v, options)
}),
results.total
)
})
const _ctx: MeasureContext = (options as ServerFindOptions<Doc>)?.ctx ?? metrics
delete (options as ServerFindOptions<Doc>)?.ctx
const results = await this.findAll(_ctx, _class, query, options)
return toFindResult(
results.map((v) => {
return this.hierarchy.updateLookupMixin(_class, v, options)
}),
results.total
)
},
findOne: async (_class, query, options) => {
return (
await metrics.with('query', {}, async (ctx) => {
const results = await this.findAll(ctx, _class, query, { ...options, limit: 1 })
return toFindResult(
results.map((v) => {
return this.hierarchy.updateLookupMixin(_class, v, options)
}),
results.total
)
})
const _ctx: MeasureContext = (options as ServerFindOptions<Doc>)?.ctx ?? metrics
delete (options as ServerFindOptions<Doc>)?.ctx
const results = await this.findAll(_ctx, _class, query, { ...options, limit: 1 })
return toFindResult(
results.map((v) => {
return this.hierarchy.updateLookupMixin(_class, v, options)
}),
results.total
)[0]
},
tx: async (tx) => {
@ -264,12 +269,12 @@ export class TServerStorage implements ServerStorage {
return adapter
}
private async routeTx (ctx: MeasureContext, removedDocs: Map<Ref<Doc>, Doc>, ...txes: Tx[]): Promise<TxResult[]> {
private async routeTx (ctx: SessionOperationContext, ...txes: Tx[]): Promise<TxResult[]> {
const result: TxResult[] = []
const domainGroups = new Map<Domain, TxCUD<Doc>[]>()
const processPart = async (domain: Domain, txes: TxCUD<Doc>[]): Promise<void> => {
const routeToAdapter = async (domain: Domain, txes: TxCUD<Doc>[]): Promise<void> => {
if (txes.length > 0) {
// Find all deleted documents
@ -280,15 +285,18 @@ export class TServerStorage implements ServerStorage {
const toDeleteDocs = await ctx.with(
'adapter-load',
{ domain },
async () => await adapter.load(ctx, domain, toDelete)
async () => await adapter.load(ctx.ctx, domain, toDelete),
{ count: toDelete.length }
)
for (const ddoc of toDeleteDocs) {
removedDocs.set(ddoc._id, ddoc)
ctx.removedMap.set(ddoc._id, ddoc)
}
}
const r = await ctx.with('adapter-tx', { domain }, async (ctx) => await adapter.tx(ctx, ...txes))
const r = await ctx.with('adapter-tx', { domain }, async (ctx) => await adapter.tx(ctx.ctx, ...txes), {
txes: txes.length
})
// Update server live queries.
await this.liveQuery.tx(...txes)
@ -304,7 +312,7 @@ export class TServerStorage implements ServerStorage {
const txCUD = TxProcessor.extractTx(tx) as TxCUD<Doc>
if (!TxProcessor.isExtendsCUD(txCUD._class)) {
// Skip unsupported tx
ctx.error('Unsupported transaction', tx)
ctx.ctx.error('Unsupported transaction', tx)
continue
}
const domain = this.hierarchy.getDomain(txCUD.objectClass)
@ -317,7 +325,7 @@ export class TServerStorage implements ServerStorage {
group.push(txCUD)
}
for (const [domain, txes] of domainGroups.entries()) {
await processPart(domain, txes)
await routeToAdapter(domain, txes)
}
return result
}
@ -327,14 +335,14 @@ export class TServerStorage implements ServerStorage {
_class: Ref<Class<D>>,
modifiedBy: Ref<Account>,
modifiedOn: number,
attachedTo: D,
attachedTo: Pick<Doc, '_class' | 'space'>,
update: DocumentUpdate<D>
): Promise<Tx> {
const txFactory = new TxFactory(modifiedBy, true)
const baseClass = this.hierarchy.getBaseClass(_class)
if (baseClass !== _class) {
// Mixin operation is required.
const tx = txFactory.createTxMixin(_id, attachedTo._class, attachedTo.space, _class, update)
const tx = txFactory.createTxMixin<Doc, Doc>(_id, attachedTo._class, attachedTo.space, _class, update)
tx.modifiedOn = modifiedOn
return tx
@ -346,7 +354,11 @@ export class TServerStorage implements ServerStorage {
}
}
private async updateCollection (ctx: MeasureContext, tx: Tx, findAll: ServerStorage['findAll']): Promise<Tx[]> {
private async updateCollection (
ctx: SessionOperationContext,
tx: Tx,
findAll: ServerStorage['findAll']
): Promise<Tx[]> {
if (tx._class !== core.class.TxCollectionCUD) {
return []
}
@ -367,7 +379,7 @@ export class TServerStorage implements ServerStorage {
return []
}
const oldAttachedTo = (await findAll(ctx, _class, { _id }, { limit: 1 }))[0]
const oldAttachedTo = (await findAll(ctx.ctx, _class, { _id }, { limit: 1 }))[0]
let oldTx: Tx | null = null
if (oldAttachedTo !== undefined) {
const attr = this.hierarchy.findAttribute(oldAttachedTo._class, colTx.collection)
@ -381,7 +393,7 @@ export class TServerStorage implements ServerStorage {
const newAttachedToClass = operations.attachedToClass ?? _class
const newAttachedToCollection = operations.collection ?? colTx.collection
const newAttachedTo = (await findAll(ctx, newAttachedToClass, { _id: operations.attachedTo }, { limit: 1 }))[0]
const newAttachedTo = (await findAll(ctx.ctx, newAttachedToClass, { _id: operations.attachedTo }, { limit: 1 }))[0]
let newTx: Tx | null = null
const newAttr = this.hierarchy.findAttribute(newAttachedToClass, newAttachedToCollection)
if (newAttachedTo !== undefined && newAttr !== undefined) {
@ -399,10 +411,9 @@ export class TServerStorage implements ServerStorage {
}
private async processCollection (
ctx: MeasureContext,
ctx: SessionOperationContext,
txes: Tx[],
findAll: ServerStorage['findAll'],
removedMap: Map<Ref<Doc>, Doc>
findAll: ServerStorage['findAll']
): Promise<Tx[]> {
const result: Tx[] = []
for (const tx of txes) {
@ -424,13 +435,22 @@ export class TServerStorage implements ServerStorage {
result.push(...(await this.updateCollection(ctx, tx, findAll)))
}
if ((isCreateTx || isDeleteTx) && !removedMap.has(_id)) {
const attachedTo = (await findAll(ctx, _class, { _id }, { limit: 1 }))[0]
if ((isCreateTx || isDeleteTx) && !ctx.removedMap.has(_id)) {
// TODO: Why we need attachedTo to be found? It uses attachedTo._class, attachedTo.space only inside
// We found case for Todos, we could attach a collection with
const attachedTo = (await findAll(ctx.ctx, _class, { _id }, { limit: 1 }))[0]
if (attachedTo !== undefined) {
result.push(
await this.getCollectionUpdateTx(_id, _class, tx.modifiedBy, colTx.modifiedOn, attachedTo, {
$inc: { [colTx.collection]: isCreateTx ? 1 : -1 }
})
await this.getCollectionUpdateTx(
_id,
_class,
tx.modifiedBy,
colTx.modifiedOn,
attachedTo, // { _class: colTx.objectClass, space: colTx.objectSpace },
{
$inc: { [colTx.collection]: isCreateTx ? 1 : -1 }
}
)
)
}
}
@ -536,10 +556,9 @@ export class TServerStorage implements ServerStorage {
}
private async processRemove (
ctx: MeasureContext,
ctx: SessionOperationContext,
txes: Tx[],
findAll: ServerStorage['findAll'],
removedMap: Map<Ref<Doc>, Doc>
findAll: ServerStorage['findAll']
): Promise<Tx[]> {
const result: Tx[] = []
@ -549,29 +568,26 @@ export class TServerStorage implements ServerStorage {
continue
}
const rtx = actualTx as TxRemoveDoc<Doc>
const object = removedMap.get(rtx.objectId)
const object = ctx.removedMap.get(rtx.objectId)
if (object === undefined) {
continue
}
result.push(...(await this.deleteClassCollections(ctx, object._class, rtx.objectId, findAll, removedMap)))
result.push(...(await this.deleteClassCollections(ctx, object._class, rtx.objectId, findAll)))
const mixins = this.getMixins(object._class, object)
for (const mixin of mixins) {
result.push(
...(await this.deleteClassCollections(ctx, mixin, rtx.objectId, findAll, removedMap, object._class))
)
result.push(...(await this.deleteClassCollections(ctx, mixin, rtx.objectId, findAll, object._class)))
}
result.push(...(await this.deleteRelatedDocuments(ctx, object, findAll, removedMap)))
result.push(...(await this.deleteRelatedDocuments(ctx, object, findAll)))
}
return result
}
private async deleteClassCollections (
ctx: MeasureContext,
ctx: SessionOperationContext,
_class: Ref<Class<Doc>>,
objectId: Ref<Doc>,
findAll: ServerStorage['findAll'],
removedMap: Map<Ref<Doc>, Doc>,
to?: Ref<Class<Doc>>
): Promise<Tx[]> {
const attributes = this.hierarchy.getAllAttributes(_class, to)
@ -579,9 +595,9 @@ export class TServerStorage implements ServerStorage {
for (const attribute of attributes) {
if (this.hierarchy.isDerived(attribute[1].type._class, core.class.Collection)) {
const collection = attribute[1].type as Collection<AttachedDoc>
const allAttached = await findAll(ctx, collection.of, { attachedTo: objectId })
const allAttached = await findAll(ctx.ctx, collection.of, { attachedTo: objectId })
for (const attached of allAttached) {
result.push(...this.deleteObject(ctx, attached, removedMap))
result.push(...this.deleteObject(ctx.ctx, attached, ctx.removedMap))
}
}
}
@ -611,10 +627,9 @@ export class TServerStorage implements ServerStorage {
}
private async deleteRelatedDocuments (
ctx: MeasureContext,
ctx: SessionOperationContext,
object: Doc,
findAll: ServerStorage['findAll'],
removedMap: Map<Ref<Doc>, Doc>
findAll: ServerStorage['findAll']
): Promise<Tx[]> {
const result: Tx[] = []
const objectClass = this.hierarchy.getClass(object._class)
@ -625,10 +640,10 @@ export class TServerStorage implements ServerStorage {
)
const collector = await getResource(removeParticipand.collectDocs)
const docs = await collector(object, this.hierarchy, async (_class, query, options) => {
return await findAll(ctx, _class, query, options)
return await findAll(ctx.ctx, _class, query, options)
})
for (const d of docs) {
result.push(...this.deleteObject(ctx, d, removedMap))
result.push(...this.deleteObject(ctx.ctx, d, ctx.removedMap))
}
}
return result
@ -742,8 +757,7 @@ export class TServerStorage implements ServerStorage {
private async processDerived (
ctx: SessionOperationContext,
txes: Tx[],
findAll: ServerStorage['findAll'],
removedMap: Map<Ref<Doc>, Doc>
findAll: ServerStorage['findAll']
): Promise<Tx[]> {
const fAll =
(mctx: MeasureContext) =>
@ -754,25 +768,22 @@ export class TServerStorage implements ServerStorage {
): Promise<FindResult<T>> =>
findAll(mctx, clazz, query, options)
const removed = await ctx.with('process-remove', {}, (ctx) =>
this.processRemove(ctx.ctx, txes, findAll, removedMap)
)
const collections = await ctx.with('process-collection', {}, (ctx) =>
this.processCollection(ctx.ctx, txes, findAll, removedMap)
)
const removed = await ctx.with('process-remove', {}, (ctx) => this.processRemove(ctx, txes, findAll))
const collections = await ctx.with('process-collection', {}, (ctx) => this.processCollection(ctx, txes, findAll))
const moves = await ctx.with('process-move', {}, (ctx) => this.processMove(ctx.ctx, txes, findAll))
const applyTxes: Tx[] = []
const triggerControl: Omit<TriggerControl, 'txFactory' | 'ctx' | 'txes' | 'apply'> = {
operationContext: ctx,
removedMap,
removedMap: ctx.removedMap,
workspace: this.workspaceId,
branding: this.options.branding,
storageAdapter: this.storageAdapter,
serviceAdaptersManager: this.serviceAdaptersManager,
findAll: fAll(ctx.ctx),
findAllCtx: findAll,
contextCache: ctx.contextCache,
modelDb: this.modelDb,
hierarchy: this.hierarchy,
applyCtx: async (ctx, tx, needResult) => {
@ -784,8 +795,15 @@ export class TServerStorage implements ServerStorage {
return {}
},
// Will create a live query if missing and return values immediately if already asked.
queryFind: async (_class, query, options) => {
return await this.liveQuery.queryFind(_class, query, options)
queryFind: (ctx: MeasureContext, _class, query, options) => {
const domain = this.hierarchy.findDomain(_class)
return ctx.with('query-find', { domain }, (ctx) => {
const { ctx: octx, ...pureOptions } = ((options as ServerFindOptions<Doc>) ?? {}) as any
return addOperation(ctx, 'query-find', { domain, _class, query: query as any, options: pureOptions }, () =>
// We sure ctx is required to be passed
this.liveQuery.queryFind(_class, query, { ...options, ctx } as any)
)
})
}
}
const triggers = await ctx.with(
@ -823,16 +841,14 @@ export class TServerStorage implements ServerStorage {
{ txes: [], targets: {} },
this.workspaceId,
this.options.branding,
true
true,
ctx.removedMap,
ctx.contextCache
)
const aresult = await performAsync(applyCtx)
if (aresult.length > 0) {
await this.apply(applyCtx, aresult)
}
if (applyTxes.length > 0) {
await this.apply(applyCtx, applyTxes)
if (aresult.length > 0 || applyTxes.length > 0) {
await this.apply(applyCtx, aresult.concat(applyTxes))
}
// We need to broadcast changes
const combinedTxes = applyCtx.derived.txes.concat(aresult)
@ -855,22 +871,21 @@ export class TServerStorage implements ServerStorage {
const derived = [...removed, ...collections, ...moves, ...triggers]
return await this.processDerivedTxes(derived, ctx, findAll, removedMap)
return await this.processDerivedTxes(derived, ctx, findAll)
}
private async processDerivedTxes (
derived: Tx[],
ctx: SessionOperationContext,
findAll: ServerStorage['findAll'],
removedMap: Map<Ref<Doc>, Doc>
findAll: ServerStorage['findAll']
): Promise<Tx[]> {
derived.sort((a, b) => a.modifiedOn - b.modifiedOn)
await ctx.with('derived-route-tx', {}, (ctx) => this.routeTx(ctx.ctx, removedMap, ...derived))
await ctx.with('derived-route-tx', {}, (ctx) => this.routeTx(ctx, ...derived))
const nestedTxes: Tx[] = []
if (derived.length > 0) {
nestedTxes.push(...(await this.processDerived(ctx, derived, findAll, removedMap)))
nestedTxes.push(...(await this.processDerived(ctx, derived, findAll)))
}
const res = [...derived, ...nestedTxes]
@ -974,7 +989,6 @@ export class TServerStorage implements ServerStorage {
const modelTx: Tx[] = []
const applyTxes: Tx[] = []
const txToProcess: Tx[] = []
const removedMap = new Map<Ref<Doc>, Doc>()
const onEnds: (() => void)[] = []
const result: TxResult[] = []
const derived: Tx[] = [...txes].filter((it) => it._class !== core.class.TxApplyIf)
@ -1014,16 +1028,18 @@ export class TServerStorage implements ServerStorage {
{},
async (ctx) => await this.getAdapter(DOMAIN_TX, true).tx(ctx.ctx, ...txToStore),
{
count: txToStore.length
count: txToStore.length,
txes: Array.from(new Set(txToStore.map((it) => it._class)))
}
)
}
result.push(...(await ctx.with('routeTx', {}, (ctx) => this.routeTx(ctx.ctx, removedMap, ...txToProcess))), {
count: txToProcess.length
})
// invoke triggers and store derived objects
derived.push(...(await this.processDerived(ctx, txToProcess, _findAll, removedMap)))
if (txToProcess.length > 0) {
result.push(...(await ctx.with('routeTx', {}, (ctx) => this.routeTx(ctx, ...txToProcess))), {
count: txToProcess.length
})
// invoke triggers and store derived objects
derived.push(...(await this.processDerived(ctx, txToProcess, _findAll)))
}
// index object
const ftx = [...txToProcess, ...derived]
@ -1060,13 +1076,18 @@ export class TServerStorage implements ServerStorage {
st = Date.now()
}
let op: OperationLog | undefined
let opLogMetrics: Metrics | undefined
const result = await ctx.with(
measureName !== undefined ? `📶 ${measureName}` : 'client-tx',
{ _class: tx._class },
async (ctx) => {
;({ opLogMetrics, op } = registerOperationLog(ctx.ctx))
return await this.processTxes(ctx, [tx])
}
)
updateOperationLog(opLogMetrics, op)
if (measureName !== undefined && st !== undefined) {
;(result as TxApplyResult).serverTime = Date.now() - st

View File

@ -54,6 +54,9 @@ export interface ServerFindOptions<T extends Doc> extends FindOptions<T> {
skipClass?: boolean
skipSpace?: boolean
// Optional measure context, for server side operations
ctx?: MeasureContext
}
/**
* @public
@ -174,6 +177,8 @@ export interface TriggerControl {
modelDb: ModelDb
removedMap: Map<Ref<Doc>, Doc>
contextCache: Map<string, any>
// Since we don't have other storages let's consider adapter is MinioClient
// Later can be replaced with generic one with bucket encapsulated inside.
storageAdapter: StorageAdapter
@ -184,6 +189,7 @@ export interface TriggerControl {
// Will create a live query if missing and return values immediately if already asked.
queryFind: <T extends Doc>(
ctx: MeasureContext,
_class: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: FindOptions<T>

View File

@ -2,6 +2,8 @@ import core, {
WorkspaceEvent,
generateId,
getTypeOf,
type Branding,
type BrandingMap,
type BulkUpdateEvent,
type Class,
type Doc,
@ -10,9 +12,7 @@ import core, {
type ParamsType,
type Ref,
type TxWorkspaceEvent,
type WorkspaceIdWithUrl,
type Branding,
type BrandingMap
type WorkspaceIdWithUrl
} from '@hcengineering/core'
import { type Hash } from 'crypto'
import fs from 'fs'
@ -143,7 +143,9 @@ export class SessionContextImpl implements SessionContext {
readonly derived: SessionContext['derived'],
readonly workspace: WorkspaceIdWithUrl,
readonly branding: Branding | null,
readonly isAsyncContext: boolean
readonly isAsyncContext: boolean,
readonly removedMap: Map<Ref<Doc>, Doc>,
readonly contextCache: Map<string, any>
) {}
with<T>(
@ -165,7 +167,9 @@ export class SessionContextImpl implements SessionContext {
this.derived,
this.workspace,
this.branding,
this.isAsyncContext
this.isAsyncContext,
this.removedMap,
this.contextCache
)
),
fullParams

View File

@ -22,20 +22,20 @@ import core, {
type Domain,
DOMAIN_MODEL,
DOMAIN_TX,
type FullParamsType,
generateId,
getWorkspaceId,
Hierarchy,
type MeasureContext,
MeasureMetricsContext,
ModelDb,
type ParamsType,
type Ref,
type SessionOperationContext,
SortingOrder,
type Space,
TxOperations,
type WorkspaceId,
type SessionOperationContext,
type ParamsType,
type FullParamsType
type WorkspaceId
} from '@hcengineering/core'
import {
type ContentTextAdapter,
@ -189,14 +189,17 @@ describe('mongo operations', () => {
txes: [],
targets: {}
},
with: async <T>(
with: <T>(
name: string,
params: ParamsType,
op: (ctx: SessionOperationContext) => T | Promise<T>,
fullParams?: FullParamsType
): Promise<T> => {
return await op(soCtx)
}
const result = op(soCtx)
return result instanceof Promise ? result : Promise.resolve(result)
},
contextCache: new Map(),
removedMap: new Map()
}
client = await createClient(async (handler) => {
const st: ClientConnection = {

View File

@ -143,7 +143,7 @@ export function createRawMongoDBAdapter (url: string): RawDBAdapter {
while (ops.length > 0) {
const part = ops.splice(0, skip)
try {
await ctx.with('bulk-write', {}, async () => {
await ctx.with('raw-bulk-write', {}, async () => {
await coll.bulkWrite(
part.map((it) => {
const { $unset, ...set } = it[1] as any

View File

@ -19,10 +19,12 @@ import core, {
RateLimiter,
SortingOrder,
TxProcessor,
addOperation,
cutObjectArray,
escapeLikeForRegexp,
groupByArray,
isOperator,
matchQuery,
toFindResult,
withContext,
type AttachedDoc,
@ -35,7 +37,6 @@ import core, {
type EnumOf,
type FindOptions,
type FindResult,
type FullParamsType,
type Hierarchy,
type Lookup,
type MeasureContext,
@ -82,7 +83,8 @@ import {
type FindCursor,
type FindOptions as MongoFindOptions,
type Sort,
type UpdateFilter
type UpdateFilter,
type WithId
} from 'mongodb'
import { DBCollectionHelper, getMongoClient, getWorkspaceDB, type MongoClientReference } from './utils'
@ -148,9 +150,9 @@ abstract class MongoAdapterBase implements DbAdapter {
this.handlers.push(handler)
}
handleEvent (domain: Domain, event: 'add' | 'update' | 'delete' | 'read', count: number, time: number): void {
handleEvent (domain: Domain, event: 'add' | 'update' | 'delete' | 'read', count: number): void {
for (const handler of this.handlers) {
handler(domain, event, count, time, this._db)
handler(domain, event, count, this._db)
}
}
@ -186,70 +188,75 @@ abstract class MongoAdapterBase implements DbAdapter {
clazz: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: ServerFindOptions<T>
): Filter<Document> {
const translated: any = {}
): { base: Filter<Document>, lookup: Filter<Document> } {
const translatedBase: any = {}
const translatedLookup: any = {}
const mixins = new Set<Ref<Class<Doc>>>()
for (const key in query) {
const value = (query as any)[key]
const tkey = this.translateKey(key, clazz)
const tkey = this.translateKey(key, clazz, mixins)
const translated = tkey.lookup ? translatedLookup : translatedBase
if (value !== null && typeof value === 'object') {
const keys = Object.keys(value)
if (keys[0] === '$like') {
translated[tkey] = translateLikeQuery(value.$like as string)
translated[tkey.key] = translateLikeQuery(value.$like as string)
continue
}
}
translated[tkey] = value
translated[tkey.key] = value
}
if (options?.skipSpace === true) {
delete translated.space
delete translatedBase.space
}
if (options?.skipClass === true) {
delete translated._class
return translated
delete translatedBase._class
return { base: translatedBase, lookup: translatedLookup }
}
const baseClass = this.hierarchy.getBaseClass(clazz)
if (baseClass !== core.class.Doc) {
const classes = this.hierarchy.getDescendants(baseClass).filter((it) => !this.hierarchy.isMixin(it))
// Only replace if not specified
if (translated._class === undefined) {
translated._class = { $in: classes }
} else if (typeof translated._class === 'string') {
if (!classes.includes(translated._class)) {
translated._class = classes.length === 1 ? classes[0] : { $in: classes }
if (translatedBase._class === undefined) {
translatedBase._class = { $in: classes }
} else if (typeof translatedBase._class === 'string') {
if (!classes.includes(translatedBase._class)) {
translatedBase._class = classes.length === 1 ? classes[0] : { $in: classes }
}
} else if (typeof translated._class === 'object' && translated._class !== null) {
} else if (typeof translatedBase._class === 'object' && translatedBase._class !== null) {
let descendants: Ref<Class<Doc>>[] = classes
if (Array.isArray(translated._class.$in)) {
if (Array.isArray(translatedBase._class.$in)) {
const classesIds = new Set(classes)
descendants = translated._class.$in.filter((c: Ref<Class<Doc>>) => classesIds.has(c))
descendants = translatedBase._class.$in.filter((c: Ref<Class<Doc>>) => classesIds.has(c))
}
if (translated._class != null && Array.isArray(translated._class.$nin)) {
const excludedClassesIds = new Set<Ref<Class<Doc>>>(translated._class.$nin)
if (translatedBase._class != null && Array.isArray(translatedBase._class.$nin)) {
const excludedClassesIds = new Set<Ref<Class<Doc>>>(translatedBase._class.$nin)
descendants = descendants.filter((c) => !excludedClassesIds.has(c))
}
const desc = descendants.filter((it: any) => !this.hierarchy.isMixin(it as Ref<Class<Doc>>))
translated._class = desc.length === 1 ? desc[0] : { $in: desc }
translatedBase._class = desc.length === 1 ? desc[0] : { $in: desc }
}
if (baseClass !== clazz) {
if (baseClass !== clazz && !mixins.has(clazz)) {
// Add an mixin to be exists flag
translated[clazz] = { $exists: true }
translatedBase[clazz] = { $exists: true }
}
} else {
// No need to pass _class in case of fixed domain search.
if ('_class' in translated) {
delete translated._class
if ('_class' in translatedBase) {
delete translatedBase._class
}
}
if (translated._class?.$in?.length === 1 && translated._class?.$nin === undefined) {
translated._class = translated._class.$in[0]
if (translatedBase._class?.$in?.length === 1 && translatedBase._class?.$nin === undefined) {
translatedBase._class = translatedBase._class.$in[0]
}
return translated
return { base: translatedBase, lookup: translatedLookup }
}
private async getLookupValue<T extends Doc>(
@ -444,7 +451,7 @@ abstract class MongoAdapterBase implements DbAdapter {
if (options?.sort !== undefined) {
const sort = {} as any
for (const _key in options.sort) {
const key = this.translateKey(_key, clazz)
const { key } = this.translateKey(_key, clazz)
if (typeof options.sort[_key] === 'object') {
const rules = options.sort[_key] as SortingRules<T>
@ -476,15 +483,26 @@ abstract class MongoAdapterBase implements DbAdapter {
): Promise<FindResult<T>> {
const st = Date.now()
const pipeline: any[] = []
const match = { $match: this.translateQuery(clazz, query, options) }
const tquery = this.translateQuery(clazz, query, options)
const slowPipeline = isLookupQuery(query) || isLookupSort(options?.sort)
const steps = await ctx.with('get-lookups', {}, async () => await this.getLookups(clazz, options?.lookup))
if (slowPipeline) {
if (Object.keys(tquery.base).length > 0) {
pipeline.push({ $match: tquery.base })
}
for (const step of steps) {
pipeline.push({ $lookup: step })
}
if (Object.keys(tquery.lookup).length > 0) {
pipeline.push({ $match: tquery.lookup })
}
} else {
if (Object.keys(tquery.base).length > 0) {
pipeline.push({ $match: { ...tquery.base, ...tquery.lookup } })
}
}
pipeline.push(match)
const totalPipeline: any[] = [...pipeline]
await this.fillSortPipeline(clazz, options, pipeline)
if (options?.limit !== undefined || typeof query._id === 'string') {
@ -563,18 +581,24 @@ abstract class MongoAdapterBase implements DbAdapter {
queueTime: st - stTime
})
}
this.handleEvent(domain, 'read', result.length, edTime - st)
this.handleEvent(domain, 'read', result.length)
return toFindResult(this.stripHash(result) as T[], total)
}
private translateKey<T extends Doc>(key: string, clazz: Ref<Class<T>>): string {
private translateKey<T extends Doc>(
key: string,
clazz: Ref<Class<T>>,
mixins?: Set<Ref<Class<Doc>>>
): { key: string, lookup: boolean } {
const arr = key.split('.').filter((p) => p)
let tKey = ''
let lookup = false
for (let i = 0; i < arr.length; i++) {
const element = arr[i]
if (element === '$lookup') {
tKey += arr[++i] + '_lookup'
lookup = true
} else {
if (!tKey.endsWith('.') && i > 0) {
tKey += '.'
@ -585,10 +609,10 @@ abstract class MongoAdapterBase implements DbAdapter {
}
}
// Check if key is belong to mixin class, we need to add prefix.
tKey = this.checkMixinKey<T>(tKey, clazz)
tKey = this.checkMixinKey<T>(tKey, clazz, mixins)
}
return tKey
return { key: tKey, lookup }
}
private clearExtraLookups (row: any): void {
@ -600,13 +624,14 @@ abstract class MongoAdapterBase implements DbAdapter {
}
}
private checkMixinKey<T extends Doc>(key: string, clazz: Ref<Class<T>>): string {
private checkMixinKey<T extends Doc>(key: string, clazz: Ref<Class<T>>, mixins?: Set<Ref<Class<Doc>>>): string {
if (!key.includes('.')) {
try {
const attr = this.hierarchy.findAttribute(clazz, key)
if (attr !== undefined && this.hierarchy.isMixin(attr.attributeOf)) {
// It is mixin
key = attr.attributeOf + '.' + key
mixins?.add(attr.attributeOf)
}
} catch (err: any) {
// ignore, if
@ -655,67 +680,20 @@ abstract class MongoAdapterBase implements DbAdapter {
field: string,
query?: DocumentQuery<D>
): Promise<Set<T>> {
const result = await ctx.with(
'groupBy',
{ domain },
async (ctx) => {
const coll = this.collection(domain)
const grResult = await coll
.aggregate([
...(query !== undefined ? [{ $match: query }] : []),
{
$group: {
_id: '$' + field
}
const result = await ctx.with('groupBy', { domain }, async (ctx) => {
const coll = this.collection(domain)
const grResult = await coll
.aggregate([
...(query !== undefined ? [{ $match: query }] : []),
{
$group: {
_id: '$' + field
}
])
.toArray()
return new Set(grResult.map((it) => it._id as unknown as T))
},
() => ({
findOps: this.findOps,
txOps: this.txOps
})
)
return result
}
findOps: number = 0
txOps: number = 0
opIndex: number = 0
async collectOps<T>(
ctx: MeasureContext,
domain: Domain | undefined,
operation: 'find' | 'tx',
op: (ctx: MeasureContext) => Promise<T>,
fullParam: FullParamsType
): Promise<T> {
const id = `${++this.opIndex}`
if (operation === 'find') {
this.findOps++
} else {
this.txOps++
}
const result = await ctx.with(
operation,
{ domain },
async (ctx) => await op(ctx),
() => ({
...fullParam,
id,
findOps: this.findOps,
txOps: this.txOps
})
)
if (operation === 'find') {
this.findOps--
} else {
this.txOps--
}
}
])
.toArray()
return new Set(grResult.map((it) => it._id as unknown as T))
})
return result
}
@ -726,14 +704,17 @@ abstract class MongoAdapterBase implements DbAdapter {
options?: ServerFindOptions<T>
): Promise<FindResult<T>> {
const stTime = Date.now()
return await this.findRateLimit.exec(async () => {
const st = Date.now()
const domain = options?.domain ?? this.hierarchy.getDomain(_class)
const result = await this.collectOps(
ctx,
domain,
'find',
async (ctx) => {
const mongoQuery = this.translateQuery(_class, query, options)
const fQuery = { ...mongoQuery.base, ...mongoQuery.lookup }
return await addOperation(
ctx,
'find-all',
{},
async () =>
await this.findRateLimit.exec(async () => {
const st = Date.now()
let result: FindResult<T>
const domain = options?.domain ?? this.hierarchy.getDomain(_class)
if (
options != null &&
(options?.lookup != null || this.isEnumSort(_class, options) || this.isRulesSort(options))
@ -741,7 +722,6 @@ abstract class MongoAdapterBase implements DbAdapter {
return await this.findWithPipeline(ctx, domain, _class, query, options, stTime)
}
const coll = this.collection(domain)
const mongoQuery = this.translateQuery(_class, query, options)
if (options?.limit === 1 || typeof query._id === 'string') {
// Skip sort/projection/etc.
@ -758,21 +738,31 @@ abstract class MongoAdapterBase implements DbAdapter {
findOptions.projection = this.calcProjection<T>(options, _class)
}
const doc = await coll.findOne(mongoQuery, findOptions)
let doc: WithId<Document> | null
if (typeof fQuery._id === 'string') {
doc = await coll.findOne({ _id: fQuery._id }, findOptions)
if (doc != null && matchQuery([doc as unknown as Doc], query, _class, this.hierarchy).length === 0) {
doc = null
}
} else {
doc = await coll.findOne(fQuery, findOptions)
}
let total = -1
if (options?.total === true) {
total = await coll.countDocuments(mongoQuery)
total = await coll.countDocuments({ ...mongoQuery.base, ...mongoQuery.lookup })
}
if (doc != null) {
return toFindResult([this.stripHash<T>(doc as unknown as T) as T], total)
}
return toFindResult([], total)
},
{ domain, mongoQuery }
{ domain, mongoQuery, _idOnly: typeof fQuery._id === 'string' }
)
}
let cursor = coll.find<T>(mongoQuery)
let cursor = coll.find<T>(fQuery)
if (options?.projection !== undefined) {
const projection = this.calcProjection<T>(options, _class)
@ -790,7 +780,7 @@ abstract class MongoAdapterBase implements DbAdapter {
}
if (options.limit !== undefined || typeof query._id === 'string') {
if (options.total === true) {
total = await coll.countDocuments(mongoQuery)
total = await coll.countDocuments(fQuery)
}
cursor = cursor.limit(options.limit ?? 1)
}
@ -806,6 +796,7 @@ abstract class MongoAdapterBase implements DbAdapter {
},
() => ({
size: res.length,
queueTime: stTime - st,
mongoQuery,
options,
domain
@ -814,31 +805,31 @@ abstract class MongoAdapterBase implements DbAdapter {
if (options?.total === true && options?.limit === undefined) {
total = res.length
}
return toFindResult(this.stripHash(res) as T[], total)
result = toFindResult(this.stripHash(res) as T[], total)
} catch (e) {
console.error('error during executing cursor in findAll', _class, cutObjectArray(query), options, e)
throw e
}
},
{
_class,
query,
options
}
)
const edTime = Date.now()
if (edTime - st > 1000 || st - stTime > 1000) {
ctx.error('FindAll', {
time: edTime - st,
_class,
query: cutObjectArray(query),
options,
queueTime: st - stTime
})
const edTime = Date.now()
if (edTime - st > 1000 || st - stTime > 1000) {
ctx.error('FindAll', {
time: edTime - st,
_class,
query: fQuery,
options,
queueTime: st - stTime
})
}
this.handleEvent(domain, 'read', result.length)
return result
}),
{
_class,
query: fQuery,
options
}
this.handleEvent(domain, 'read', result.length, edTime - st)
return result
})
)
}
private collectSort<T extends Doc>(
@ -916,7 +907,7 @@ abstract class MongoAdapterBase implements DbAdapter {
if (bulkUpdate.size > 1000 || flush) {
if (bulkUpdate.size > 0) {
await ctx.with(
'bulk-write',
'bulk-write-find',
{},
async () =>
await coll.bulkWrite(
@ -1036,33 +1027,40 @@ abstract class MongoAdapterBase implements DbAdapter {
while (ops.length > 0) {
const part = ops.splice(0, skip)
try {
await ctx.with('bulk-write', {}, async () => {
await coll.bulkWrite(
part.map((it) => {
const { $unset, ...set } = it[1] as any
if ($unset !== undefined) {
for (const k of Object.keys(set)) {
if ($unset[k] === '') {
// eslint-disable-next-line @typescript-eslint/no-dynamic-delete
delete $unset[k]
await ctx.with(
'bulk-update',
{},
async () => {
await coll.bulkWrite(
part.map((it) => {
const { $unset, ...set } = it[1] as any
if ($unset !== undefined) {
for (const k of Object.keys(set)) {
if ($unset[k] === '') {
// eslint-disable-next-line @typescript-eslint/no-dynamic-delete
delete $unset[k]
}
}
}
}
return {
updateOne: {
filter: { _id: it[0] },
update: {
$set: { ...set, '%hash%': null },
...($unset !== undefined ? { $unset } : {})
return {
updateOne: {
filter: { _id: it[0] },
update: {
$set: { ...set, '%hash%': null },
...($unset !== undefined ? { $unset } : {})
}
}
}
}),
{
ordered: false
}
}),
{
ordered: false
}
)
})
)
},
{
updates: part.length
}
)
} catch (err: any) {
ctx.error('failed on bulk write', { error: err, skip })
if (skip !== 1) {
@ -1082,6 +1080,7 @@ abstract class MongoAdapterBase implements DbAdapter {
}
interface OperationBulk {
bulks: number
add: Doc[]
update: Map<Ref<Doc>, Partial<Doc>>
@ -1122,6 +1121,54 @@ class MongoAdapter extends MongoAdapterBase {
}
}
bulkOps = new Map<Domain, AnyBulkWriteOperation<Doc>[]>()
async _pushBulk (ctx: MeasureContext): Promise<void> {
const bulk = Array.from(this.bulkOps.entries())
this.bulkOps.clear()
if (bulk.length === 0) {
return
}
for (const [domain, ops] of bulk) {
if (ops === undefined || ops.length === 0) {
continue
}
const coll = this.db.collection<Doc>(domain)
await this.rateLimit.exec(() =>
addOperation(
ctx,
'bulk-write',
{ domain, operations: ops.length },
async (ctx) =>
await ctx.with(
'bulk-write',
{ domain },
() =>
coll.bulkWrite(ops, {
ordered: false,
writeConcern: { w: 0 }
}),
{
domain,
operations: ops.length
}
)
)
)
}
}
async pushBulk (ctx: MeasureContext, domain: Domain, ops: AnyBulkWriteOperation<Doc>[]): Promise<void> {
const existing = this.bulkOps.get(domain)
if (existing !== undefined) {
existing.push(...ops)
} else {
this.bulkOps.set(domain, ops)
}
await this._pushBulk(ctx)
}
async tx (ctx: MeasureContext, ...txes: Tx[]): Promise<TxResult[]> {
const result: TxResult[] = []
@ -1133,105 +1180,107 @@ class MongoAdapter extends MongoAdapterBase {
return undefined
})
await this.rateLimit.exec(async () => {
for (const [domain, txs] of byDomain) {
if (domain === undefined) {
continue
}
const domainBulk: OperationBulk = {
add: [],
update: new Map(),
bulkOperations: [],
findUpdate: new Set(),
raw: []
}
for (const t of txs) {
this.updateBulk(domainBulk, t)
}
if (
domainBulk.add.length === 0 &&
domainBulk.update.size === 0 &&
domainBulk.bulkOperations.length === 0 &&
domainBulk.findUpdate.size === 0 &&
domainBulk.raw.length === 0
) {
continue
}
await this.collectOps(
ctx,
domain,
'tx',
const stTime = Date.now()
const st = Date.now()
for (const [domain, txs] of byDomain) {
if (domain === undefined) {
continue
}
const domainBulk: OperationBulk = {
bulks: 1,
add: [],
update: new Map(),
bulkOperations: [],
findUpdate: new Set(),
raw: []
}
for (const t of txs) {
this.updateBulk(domainBulk, t)
}
if (
domainBulk.add.length === 0 &&
domainBulk.update.size === 0 &&
domainBulk.bulkOperations.length === 0 &&
domainBulk.findUpdate.size === 0 &&
domainBulk.raw.length === 0
) {
continue
}
// Minir optimizations
// Add Remove optimization
const ops: AnyBulkWriteOperation<Doc>[] = []
if (domainBulk.add.length > 0) {
ops.push(...domainBulk.add.map((it) => ({ insertOne: { document: it } })))
this.handleEvent(domain, 'add', domainBulk.add.length)
}
if (domainBulk.update.size > 0) {
// Extract similar update to update many if possible
// TODO:
ops.push(
...Array.from(domainBulk.update.entries()).map((it) => ({
updateOne: {
filter: { _id: it[0] },
update: {
$set: it[1]
}
}
}))
)
this.handleEvent(domain, 'update', domainBulk.update.size)
}
if (domainBulk.bulkOperations.length > 0) {
ops.push(...domainBulk.bulkOperations)
this.handleEvent(domain, 'update', domainBulk.bulkOperations.length)
}
if (ops.length > 0) {
await this.pushBulk(ctx, domain, ops)
}
if (domainBulk.findUpdate.size > 0) {
const coll = this.db.collection<Doc>(domain)
await ctx.with(
'find-result',
{},
async (ctx) => {
const coll = this.db.collection<Doc>(domain)
const st = Date.now()
const docs = await addOperation(
ctx,
'find-result',
{},
async (ctx) => await coll.find({ _id: { $in: Array.from(domainBulk.findUpdate) } }).toArray(),
{ domain, _ids: domainBulk.findUpdate.size, queueTime: stTime - st }
)
result.push(...docs)
this.handleEvent(domain, 'read', docs.length)
},
{
domain,
queueTime: stTime - st
}
)
}
// Minir optimizations
// Add Remove optimization
if (domainBulk.add.length > 0) {
await ctx.with('insertMany', {}, async () => {
const st = Date.now()
const result = await coll.insertMany(domainBulk.add, { ordered: false })
this.handleEvent(domain, 'add', result.insertedCount, Date.now() - st)
})
}
if (domainBulk.update.size > 0) {
// Extract similar update to update many if possible
// TODO:
await ctx.with('updateMany-bulk', {}, async () => {
const st = Date.now()
const result = await coll.bulkWrite(
Array.from(domainBulk.update.entries()).map((it) => ({
updateOne: {
filter: { _id: it[0] },
update: {
$set: it[1]
}
}
})),
{
ordered: false
}
)
this.handleEvent(domain, 'update', result.modifiedCount, Date.now() - st)
})
}
if (domainBulk.bulkOperations.length > 0) {
await ctx.with('bulkWrite', {}, async () => {
const st = Date.now()
const result = await coll.bulkWrite(domainBulk.bulkOperations, {
ordered: false
})
this.handleEvent(domain, 'update', result.modifiedCount, Date.now() - st)
})
}
if (domainBulk.findUpdate.size > 0) {
await ctx.with('find-result', {}, async () => {
const st = Date.now()
const docs = await coll.find({ _id: { $in: Array.from(domainBulk.findUpdate) } }).toArray()
result.push(...docs)
this.handleEvent(domain, 'read', docs.length, Date.now() - st)
})
}
if (domainBulk.raw.length > 0) {
await ctx.with('raw', {}, async () => {
for (const r of domainBulk.raw) {
result.push({ object: await r() })
}
})
if (domainBulk.raw.length > 0) {
await ctx.with(
'raw',
{},
async (ctx) => {
for (const r of domainBulk.raw) {
result.push({ object: await addOperation(ctx, 'raw-op', {}, () => r()) })
}
},
{
domain,
add: domainBulk.add.length,
update: domainBulk.update.size,
bulk: domainBulk.bulkOperations.length,
find: domainBulk.findUpdate.size,
raw: domainBulk.raw.length
queueTime: stTime - st
}
)
}
})
}
return result
}
@ -1411,7 +1460,6 @@ class MongoAdapter extends MongoAdapterBase {
if (tx.retrieve === true) {
bulk.raw.push(async () => {
const st = Date.now()
const res = await this.collection(domain).findOneAndUpdate(
{ _id: tx.objectId },
{
@ -1424,9 +1472,8 @@ class MongoAdapter extends MongoAdapterBase {
} as unknown as UpdateFilter<Document>,
{ returnDocument: 'after', includeResultMetadata: true }
)
const dnow = Date.now() - st
this.handleEvent(domain, 'read', 1, dnow)
this.handleEvent(domain, 'update', 1, dnow)
this.handleEvent(domain, 'read', 1)
this.handleEvent(domain, 'update', 1)
return res.value as TxResult
})
} else {
@ -1475,21 +1522,46 @@ class MongoTxAdapter extends MongoAdapterBase implements TxAdapter {
await this._db.init(DOMAIN_TX)
}
txBulk: Tx[] = []
async _bulkTx (ctx: MeasureContext): Promise<void> {
const txes = this.txBulk
this.txBulk = []
const opName = txes.length === 1 ? 'tx-one' : 'tx'
await addOperation(
ctx,
opName,
{},
async (ctx) =>
await ctx.with(
'insertMany',
{ domain: 'tx' },
() =>
this.txCollection().insertMany(
txes.map((it) => translateDoc(it)),
{
ordered: false,
writeConcern: { w: 0 }
}
),
{
count: txes.length
}
),
{ domain: 'tx', count: txes.length }
)
ctx.withSync('handleEvent', {}, () => {
this.handleEvent(DOMAIN_TX, 'add', txes.length)
})
}
override async tx (ctx: MeasureContext, ...tx: Tx[]): Promise<TxResult[]> {
if (tx.length === 0) {
return []
}
const st = Date.now()
await this.collectOps(
ctx,
DOMAIN_TX,
'tx',
async () => {
await this.txCollection().insertMany(tx.map((it) => translateDoc(it)))
},
{ tx: tx.length }
)
this.handleEvent(DOMAIN_TX, 'add', tx.length, Date.now() - st)
this.txBulk.push(...tx)
await this._bulkTx(ctx)
return []
}

View File

@ -129,7 +129,9 @@ export class ClientSession implements Session {
},
this._pipeline.storage.workspaceId,
this._pipeline.storage.branding,
false
false,
new Map(),
new Map()
)
await this._pipeline.tx(context, createTx)
const acc = TxProcessor.createDoc2Doc(createTx)
@ -163,7 +165,9 @@ export class ClientSession implements Session {
},
this._pipeline.storage.workspaceId,
this._pipeline.storage.branding,
false
false,
new Map(),
new Map()
)
return await this._pipeline.findAll(context, _class, query, options)
}
@ -190,7 +194,9 @@ export class ClientSession implements Session {
},
this._pipeline.storage.workspaceId,
this._pipeline.storage.branding,
false
false,
new Map(),
new Map()
)
await ctx.sendResponse(await this._pipeline.searchFulltext(context, query, options))
}
@ -210,13 +216,18 @@ export class ClientSession implements Session {
},
this._pipeline.storage.workspaceId,
this._pipeline.storage.branding,
false
false,
new Map(),
new Map()
)
const result = await this._pipeline.tx(context, tx)
// Send result immideately
await ctx.sendResponse(result)
if (tx == null) {
return
}
// We need to combine all derived data and check if we need to send it

View File

@ -59,6 +59,7 @@ export function wipeStatistics (ctx: MeasureContext): void {
m.operations = 0
m.value = 0
m.topResult = undefined
delete (m as Metrics).opLog
if ('measurements' in m) {
for (const v of Object.values(m.measurements)) {
toClean.push(v)
@ -72,6 +73,7 @@ export function wipeStatistics (ctx: MeasureContext): void {
}
if (ctx instanceof MeasureMetricsContext) {
ctx.metrics.opLog = undefined
toClean.push(ctx.metrics)
while (toClean.length > 0) {
const v = toClean.shift()

View File

@ -132,7 +132,7 @@ async function updateDocSyncInfo (
return
}
const projects = await control.queryFind(github.mixin.GithubProject, {}, { projection: { _id: 1 } })
const projects = await control.queryFind(control.ctx, github.mixin.GithubProject, {}, { projection: { _id: 1 } })
if (projects.some((it) => it._id === (space as Ref<GithubProject>))) {
const [sdoc] = await control.findAll(github.class.DocSyncInfo, { _id: cud.objectId as Ref<DocSyncInfo> })
// We need to check if sync doc is already exists.