TSK-1237: Improve full text indexer (#3025)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2023-04-20 17:11:22 +07:00 committed by GitHub
parent 61797d225e
commit 76e71712e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
63 changed files with 879 additions and 373 deletions

1
.gitignore vendored
View File

@ -80,3 +80,4 @@ tsdoc-metadata.json
pods/front/dist pods/front/dist
*.cpuprofile *.cpuprofile
*.pyc *.pyc
metrics.txt

3
.vscode/launch.json vendored
View File

@ -37,7 +37,8 @@
"ELASTIC_URL": "http://localhost:9200", "ELASTIC_URL": "http://localhost:9200",
"MONGO_URL": "mongodb://localhost:27017", "MONGO_URL": "mongodb://localhost:27017",
"APM_SERVER_URL2": "http://localhost:8200", "APM_SERVER_URL2": "http://localhost:8200",
"METRICS_CONSOLE": "true", // Show metrics in console evert 30 seconds., "METRICS_CONSOLE": "false",
"METRICS_FILE": "${workspaceRoot}/metrics.txt", // Show metrics in console evert 30 seconds.,
"MINIO_ENDPOINT": "localhost", "MINIO_ENDPOINT": "localhost",
"MINIO_ACCESS_KEY": "minioadmin", "MINIO_ACCESS_KEY": "minioadmin",
"MINIO_SECRET_KEY": "minioadmin", "MINIO_SECRET_KEY": "minioadmin",

View File

@ -66,7 +66,7 @@ export async function generateIssues (
await connection.close() await connection.close()
ctx.end() ctx.end()
console.info(metricsToString(ctx.metrics, 'Client')) console.info(metricsToString(ctx.metrics, 'Client', 70))
} }
async function genIssue (client: TxOperations, statuses: Ref<IssueStatus>[]): Promise<void> { async function genIssue (client: TxOperations, statuses: Ref<IssueStatus>[]): Promise<void> {

View File

@ -73,7 +73,7 @@ export async function generateContacts (
await connection.close() await connection.close()
ctx.end() ctx.end()
console.info(metricsToString(ctx.metrics, 'Client')) console.info(metricsToString(ctx.metrics, 'Client', 70))
} }
async function genVacansyApplicants ( async function genVacansyApplicants (

View File

@ -152,7 +152,7 @@ export async function cleanRemovedTransactions (workspaceId: WorkspaceId, transa
) )
count += toRemove.length count += toRemove.length
console.log('processed', count, removedDocs.total) console.log('processed', count)
} }
console.log('total docs with remove', count) console.log('total docs with remove', count)

View File

@ -59,12 +59,13 @@ import {
Prop, Prop,
TypeBoolean, TypeBoolean,
TypeIntlString, TypeIntlString,
TypeRecord,
TypeRef, TypeRef,
TypeString, TypeString,
TypeTimestamp, TypeTimestamp,
UX UX
} from '@hcengineering/model' } from '@hcengineering/model'
import type { IntlString } from '@hcengineering/platform' import { getEmbeddedLabel, IntlString } from '@hcengineering/platform'
import core from './component' import core from './component'
// C O R E // C O R E
@ -256,14 +257,17 @@ export class TFulltextData extends TDoc implements FullTextData {
@Model(core.class.DocIndexState, core.class.Doc, DOMAIN_DOC_INDEX_STATE) @Model(core.class.DocIndexState, core.class.Doc, DOMAIN_DOC_INDEX_STATE)
export class TDocIndexState extends TDoc implements DocIndexState { export class TDocIndexState extends TDoc implements DocIndexState {
objectClass!: Ref<Class<Doc>> @Prop(TypeRef(core.class.Class), core.string.Class)
@Index(IndexKind.Indexed)
@Hidden()
objectClass!: Ref<Class<Doc>>
@Prop(TypeRef(core.class.Doc), core.string.AttachedTo) @Prop(TypeRef(core.class.Doc), core.string.AttachedTo)
@Index(IndexKind.Indexed) @Index(IndexKind.Indexed)
@Hidden() @Hidden()
attachedTo?: Ref<Doc> attachedTo?: Ref<Doc>
@Prop(TypeRef(core.class.Doc), core.string.AttachedToClass) @Prop(TypeRef(core.class.Class), core.string.AttachedToClass)
@Index(IndexKind.Indexed) @Index(IndexKind.Indexed)
@Hidden() @Hidden()
attachedToClass?: Ref<Class<Doc>> attachedToClass?: Ref<Class<Doc>>
@ -271,10 +275,16 @@ export class TDocIndexState extends TDoc implements DocIndexState {
// Indexable attributes of document. // Indexable attributes of document.
attributes!: Record<string, any> attributes!: Record<string, any>
removed!: boolean @Prop(TypeBoolean(), getEmbeddedLabel('Removed'))
@Index(IndexKind.Indexed)
@Hidden()
removed!: boolean
// States for different stages // States for different stages
stages!: Record<string, boolean | string> @Prop(TypeRecord(), getEmbeddedLabel('Stages'))
@Index(IndexKind.Indexed)
@Hidden()
stages!: Record<string, boolean | string>
} }
@Model(core.class.IndexStageState, core.class.Doc, DOMAIN_DOC_INDEX_STATE) @Model(core.class.IndexStageState, core.class.Doc, DOMAIN_DOC_INDEX_STATE)

View File

@ -20,7 +20,8 @@ import {
AttachedDoc, AttachedDoc,
IndexingConfiguration, IndexingConfiguration,
Class, Class,
systemAccountEmail systemAccountEmail,
DocIndexState
} from '@hcengineering/core' } from '@hcengineering/core'
import { Builder } from '@hcengineering/model' import { Builder } from '@hcengineering/model'
import core from './component' import core from './component'
@ -157,4 +158,30 @@ export function createModel (builder: Builder): void {
] ]
} }
) )
builder.mixin<Class<DocIndexState>, IndexingConfiguration<TxCollectionCUD<Doc, AttachedDoc>>>(
core.class.DocIndexState,
core.class.Class,
core.mixin.IndexConfiguration,
{
indexes: [
{
_class: 1,
stages: 1,
_id: 1,
modifiedOn: 1
},
{
_class: 1,
_id: 1,
modifiedOn: 1
},
{
_class: 1,
_id: 1,
objectClass: 1
}
]
}
)
} }

View File

@ -208,6 +208,10 @@ export function createModel (builder: Builder): void {
}, },
gmail.action.WriteEmail gmail.action.WriteEmail
) )
builder.mixin(gmail.class.Message, core.class.Class, core.mixin.FullTextSearchContext, {
parentPropagate: false
})
} }
export { gmailOperation } from './migration' export { gmailOperation } from './migration'

View File

@ -1117,19 +1117,19 @@ export function createModel (builder: Builder): void {
// Allow to use fuzzy search for mixins // Allow to use fuzzy search for mixins
builder.mixin(recruit.class.Vacancy, core.class.Class, core.mixin.FullTextSearchContext, { builder.mixin(recruit.class.Vacancy, core.class.Class, core.mixin.FullTextSearchContext, {
fullTextSummary: true, fullTextSummary: true,
propogate: [] propagate: []
}) })
builder.mixin(recruit.mixin.Candidate, core.class.Class, core.mixin.FullTextSearchContext, { builder.mixin(recruit.mixin.Candidate, core.class.Class, core.mixin.FullTextSearchContext, {
fullTextSummary: true, fullTextSummary: true,
propogate: [recruit.class.Applicant] propagate: [recruit.class.Applicant]
}) })
// Allow to use fuzzy search for mixins // Allow to use fuzzy search for mixins
builder.mixin(recruit.class.Applicant, core.class.Class, core.mixin.FullTextSearchContext, { builder.mixin(recruit.class.Applicant, core.class.Class, core.mixin.FullTextSearchContext, {
fullTextSummary: true, fullTextSummary: true,
forceIndex: true, forceIndex: true,
propogate: [] propagate: []
}) })
createAction(builder, { createAction(builder, {

View File

@ -170,6 +170,10 @@ export function createModel (builder: Builder): void {
}, },
telegram.ids.TxSharedCreate telegram.ids.TxSharedCreate
) )
builder.mixin(telegram.class.Message, core.class.Class, core.mixin.FullTextSearchContext, {
parentPropagate: false
})
} }
export { telegramOperation } from './migration' export { telegramOperation } from './migration'

View File

@ -405,7 +405,10 @@ export interface FullTextSearchContext extends Class<Doc> {
forceIndex?: boolean forceIndex?: boolean
// If defined, will propagate changes to child's with defined set of classes // If defined, will propagate changes to child's with defined set of classes
propogate?: Ref<Class<Doc>>[] propagate?: Ref<Class<Doc>>[]
// Do we need to propagate child value to parent one. Default(true)
parentPropagate?: boolean
} }
/** /**

View File

@ -25,6 +25,7 @@ import { Tx, TxCUD, TxCollectionCUD, TxCreateDoc, TxProcessor, TxUpdateDoc } fro
import { toFindResult } from './utils' import { toFindResult } from './utils'
const transactionThreshold = 500 const transactionThreshold = 500
const modelTransactionThreshold = 50
/** /**
* @public * @public
@ -194,7 +195,11 @@ export async function createClient (
const oldOnConnect: ((apply: boolean) => void) | undefined = conn.onConnect const oldOnConnect: ((apply: boolean) => void) | undefined = conn.onConnect
conn.onConnect = async () => { conn.onConnect = async () => {
// Find all new transactions and apply // Find all new transactions and apply
await loadModel(conn, loadedTxIds, allowedPlugins, configs, hierarchy, model) if (!(await loadModel(conn, loadedTxIds, allowedPlugins, configs, hierarchy, model, true))) {
// We need full refresh
await oldOnConnect?.(false)
return
}
// We need to look for last {transactionThreshold} transactions and if it is more since lastTx one we receive, we need to perform full refresh. // We need to look for last {transactionThreshold} transactions and if it is more since lastTx one we receive, we need to perform full refresh.
const atxes = await conn.findAll( const atxes = await conn.findAll(
@ -216,7 +221,7 @@ export async function createClient (
} }
} }
if (atxes.total < transactionThreshold && !needFullRefresh) { if (atxes.length < transactionThreshold && !needFullRefresh) {
console.log('applying input transactions', atxes.length) console.log('applying input transactions', atxes.length)
for (const tx of atxes) { for (const tx of atxes) {
txHandler(tx) txHandler(tx)
@ -236,8 +241,9 @@ async function loadModel (
allowedPlugins: Plugin[] | undefined, allowedPlugins: Plugin[] | undefined,
configs: Map<Ref<PluginConfiguration>, PluginConfiguration>, configs: Map<Ref<PluginConfiguration>, PluginConfiguration>,
hierarchy: Hierarchy, hierarchy: Hierarchy,
model: ModelDb model: ModelDb,
): Promise<void> { reload = false
): Promise<boolean> {
const t = Date.now() const t = Date.now()
const atxes = await conn.findAll( const atxes = await conn.findAll(
@ -246,6 +252,10 @@ async function loadModel (
{ sort: { modifiedOn: SortingOrder.Ascending, _id: SortingOrder.Ascending } } { sort: { modifiedOn: SortingOrder.Ascending, _id: SortingOrder.Ascending } }
) )
if (reload && atxes.length > modelTransactionThreshold) {
return true
}
let systemTx: Tx[] = [] let systemTx: Tx[] = []
const userTx: Tx[] = [] const userTx: Tx[] = []
console.log('find' + (processedTx.size === 0 ? 'full model' : 'model diff'), atxes.length, Date.now() - t) console.log('find' + (processedTx.size === 0 ? 'full model' : 'model diff'), atxes.length, Date.now() - t)
@ -289,6 +299,7 @@ async function loadModel (
console.error('failed to apply model transaction, skipping', JSON.stringify(tx), err) console.error('failed to apply model transaction, skipping', JSON.stringify(tx), err)
} }
} }
return false
} }
function fillConfiguration (systemTx: Tx[], configs: Map<Ref<PluginConfiguration>, PluginConfiguration>): void { function fillConfiguration (systemTx: Tx[], configs: Map<Ref<PluginConfiguration>, PluginConfiguration>): void {

View File

@ -97,6 +97,7 @@ export default plugin(coreId, {
TypeHyperlink: '' as Ref<Class<Type<Hyperlink>>>, TypeHyperlink: '' as Ref<Class<Type<Hyperlink>>>,
TypeNumber: '' as Ref<Class<Type<number>>>, TypeNumber: '' as Ref<Class<Type<number>>>,
TypeMarkup: '' as Ref<Class<Type<string>>>, TypeMarkup: '' as Ref<Class<Type<string>>>,
TypeRecord: '' as Ref<Class<Type<Record<any, any>>>>,
TypeBoolean: '' as Ref<Class<Type<boolean>>>, TypeBoolean: '' as Ref<Class<Type<boolean>>>,
TypeTimestamp: '' as Ref<Class<Type<Timestamp>>>, TypeTimestamp: '' as Ref<Class<Type<Timestamp>>>,
TypeDate: '' as Ref<Class<Type<Timestamp | Date>>>, TypeDate: '' as Ref<Class<Type<Timestamp | Date>>>,
@ -151,6 +152,7 @@ export default plugin(coreId, {
AttachedTo: '' as IntlString, AttachedTo: '' as IntlString,
AttachedToClass: '' as IntlString, AttachedToClass: '' as IntlString,
String: '' as IntlString, String: '' as IntlString,
Record: '' as IntlString,
Markup: '' as IntlString, Markup: '' as IntlString,
Number: '' as IntlString, Number: '' as IntlString,
Boolean: '' as IntlString, Boolean: '' as IntlString,

View File

@ -68,7 +68,10 @@ export function childMetrics (root: Metrics, path: string[]): Metrics {
return oop return oop
} }
function aggregate (m: Metrics): Metrics { /**
* @public
*/
export function metricsAggregate (m: Metrics): Metrics {
const ms = aggregateMetrics(m.measurements) const ms = aggregateMetrics(m.measurements)
// Use child overage, if there is no top level value specified. // Use child overage, if there is no top level value specified.
@ -105,34 +108,38 @@ function aggregate (m: Metrics): Metrics {
function aggregateMetrics (m: Record<string, Metrics>): Record<string, Metrics> { function aggregateMetrics (m: Record<string, Metrics>): Record<string, Metrics> {
const result: Record<string, Metrics> = {} const result: Record<string, Metrics> = {}
for (const [k, v] of Object.entries(m).sort((a, b) => b[1].time - a[1].time)) { for (const [k, v] of Object.entries(m).sort((a, b) => b[1].time - a[1].time)) {
result[k] = aggregate(v) result[k] = metricsAggregate(v)
} }
return result return result
} }
function toLen (val: string, sep: string, len = 50): string { function toLen (val: string, sep: string, len: number): string {
while (val.length < len) { while (val.length < len) {
val += sep val += sep
} }
return val return val
} }
function printMetricsChildren (params: Record<string, Metrics>, offset: number): string { function printMetricsChildren (params: Record<string, Metrics>, offset: number, length: number): string {
let r = '' let r = ''
if (Object.keys(params).length > 0) { if (Object.keys(params).length > 0) {
r += '\n' + toLen('', ' ', offset) r += '\n' + toLen('', ' ', offset)
r += Object.entries(params) r += Object.entries(params)
.map(([k, vv]) => toString(k, vv, offset)) .map(([k, vv]) => toString(k, vv, offset, length))
.join('\n' + toLen('', ' ', offset)) .join('\n' + toLen('', ' ', offset))
} }
return r return r
} }
function printMetricsParams (params: Record<string, Record<string, MetricsData>>, offset: number): string { function printMetricsParams (
params: Record<string, Record<string, MetricsData>>,
offset: number,
length: number
): string {
let r = '' let r = ''
const joinP = (key: string, data: Record<string, MetricsData>): string[] => { const joinP = (key: string, data: Record<string, MetricsData>): string[] => {
return Object.entries(data).map(([k, vv]) => return Object.entries(data).map(([k, vv]) =>
`${toLen('', ' ', offset)}${toLen(key + '=' + k, '-', 70 - offset)}: avg ${ `${toLen('', ' ', offset)}${toLen(key + '=' + k, '-', length - offset)}: avg ${
vv.time / (vv.operations > 0 ? vv.operations : 1) vv.time / (vv.operations > 0 ? vv.operations : 1)
} total: ${vv.time} ops: ${vv.operations}`.trim() } total: ${vv.time} ops: ${vv.operations}`.trim()
) )
@ -145,18 +152,62 @@ function printMetricsParams (params: Record<string, Record<string, MetricsData>>
return r return r
} }
function toString (name: string, m: Metrics, offset: number): string { function toString (name: string, m: Metrics, offset: number, length: number): string {
let r = `${toLen('', ' ', offset)}${toLen(name, '-', 70 - offset)}: avg ${ let r = `${toLen('', ' ', offset)}${toLen(name, '-', length - offset)}: avg ${
m.time / (m.operations > 0 ? m.operations : 1) m.time / (m.operations > 0 ? m.operations : 1)
} total: ${m.time} ops: ${m.operations}`.trim() } total: ${m.time} ops: ${m.operations}`.trim()
r += printMetricsParams(m.params, offset + 4) r += printMetricsParams(m.params, offset + 4, length)
r += printMetricsChildren(m.measurements, offset + 4) r += printMetricsChildren(m.measurements, offset + 4, length)
return r return r
} }
/** /**
* @public * @public
*/ */
export function metricsToString (metrics: Metrics, name = 'System'): string { export function metricsToString (metrics: Metrics, name = 'System', length: number): string {
return toString(name, aggregate(metrics), 0) return toString(name, metricsAggregate(metrics), 0, length)
}
function printMetricsParamsRows (
params: Record<string, Record<string, MetricsData>>,
offset: number
): (string | number)[][] {
const r: (string | number)[][] = []
function joinP (key: string, data: Record<string, MetricsData>): (string | number)[][] {
return Object.entries(data).map(([k, vv]) => [
offset,
`${key}=${k}`,
vv.time / (vv.operations > 0 ? vv.operations : 1),
vv.time,
vv.operations
])
}
for (const [k, v] of Object.entries(params)) {
r.push(...joinP(k, v))
}
return r
}
function printMetricsChildrenRows (params: Record<string, Metrics>, offset: number): (string | number)[][] {
const r: (string | number)[][] = []
if (Object.keys(params).length > 0) {
Object.entries(params).forEach(([k, vv]) => r.push(...toStringRows(k, vv, offset)))
}
return r
}
function toStringRows (name: string, m: Metrics, offset: number): (number | string)[][] {
const r: (number | string)[][] = [
[offset, name, m.time / (m.operations > 0 ? m.operations : 1), m.time, m.operations]
]
r.push(...printMetricsParamsRows(m.params, offset + 1))
r.push(...printMetricsChildrenRows(m.measurements, offset + 1))
return r
}
/**
* @public
*/
export function metricsToRows (metrics: Metrics, name = 'System'): (number | string)[][] {
return toStringRows(name, metricsAggregate(metrics), 0)
} }

View File

@ -121,6 +121,9 @@ export type FindOptions<T extends Doc> = {
sort?: SortingQuery<T> sort?: SortingQuery<T>
lookup?: Lookup<T> lookup?: Lookup<T>
projection?: Projection<T> projection?: Projection<T>
// If specified total will be returned
total?: boolean
} }
/** /**

View File

@ -172,7 +172,11 @@ export function extractDocKey (key: string): {
* @public * @public
*/ */
export function isFullTextAttribute (attr: AnyAttribute): boolean { export function isFullTextAttribute (attr: AnyAttribute): boolean {
return attr.index === IndexKind.FullText || attr.type._class === core.class.TypeAttachment return (
attr.index === IndexKind.FullText ||
attr.type._class === core.class.TypeAttachment ||
attr.type._class === core.class.EnumOf
)
} }
/** /**

View File

@ -383,6 +383,13 @@ export function TypeMarkup (): Type<Markup> {
return { _class: core.class.TypeMarkup, label: core.string.Markup } return { _class: core.class.TypeMarkup, label: core.string.Markup }
} }
/**
* @public
*/
export function TypeRecord (): Type<Markup> {
return { _class: core.class.TypeRecord, label: core.string.Record }
}
/** /**
* @public * @public
*/ */

View File

@ -8,6 +8,7 @@
"resolveJsonModule": true, "resolveJsonModule": true,
"types": ["heft-jest"], "types": ["heft-jest"],
"skipLibCheck": true, "skipLibCheck": true,
"incremental": true "incremental": true,
"declarationMap": true
} }
} }

View File

@ -9,6 +9,7 @@
"skipLibCheck": true, "skipLibCheck": true,
"incremental": true, "incremental": true,
"esModuleInterop": true, "esModuleInterop": true,
"declarationMap": true,
"lib": [ "lib": [
"esnext", "esnext",
"dom" "dom"

View File

@ -54,7 +54,10 @@ export function removeEventListener (event: string, listener: EventListener): vo
} }
} }
async function broadcastEvent (event: string, data: any): Promise<void> { /**
* @public
*/
export async function broadcastEvent (event: string, data: any): Promise<void> {
const listeners = eventListeners.get(event) const listeners = eventListeners.get(event)
if (listeners !== undefined) { if (listeners !== undefined) {
const promises = listeners.map(async (listener) => await listener(event, data)) const promises = listeners.map(async (listener) => await listener(event, data))

View File

@ -73,7 +73,7 @@
/> />
</div> </div>
</div> </div>
<Scroller> <Scroller horizontal>
<div class="antiCard-content"> <div class="antiCard-content">
<slot /> <slot />
</div> </div>

View File

@ -11,6 +11,9 @@
const client = getClient() const client = getClient()
function getContent (extra: string[], value: string): string[] { function getContent (extra: string[], value: string): string[] {
if (value == null || value === '') {
return []
}
const result = extra.includes('base64') ? decodeURIComponent(escape(atob(value))) : value const result = extra.includes('base64') ? decodeURIComponent(escape(atob(value))) : value
return `${result}`.split('\n') return `${result}`.split('\n')
@ -43,8 +46,12 @@
{#if summary} {#if summary}
{#if search.length > 0} {#if search.length > 0}
Result: Result:
{#each summary.split('\n').filter((line) => line.toLowerCase().includes(search.toLowerCase())) as line} {#each summary.split('\n').filter((line, idx, arr) => {
<span class:highlight={true}>{line}</span> return line.toLowerCase().includes(search.toLowerCase()) || arr[idx - 1]
?.toLowerCase()
.includes(search.toLowerCase())
}) as line}
<span class:highlight={line.toLowerCase().includes(search.toLowerCase())}>{line}</span>
{/each} {/each}
<br /> <br />
{/if} {/if}

View File

@ -53,7 +53,7 @@
display: grid; display: grid;
overflow: auto; overflow: auto;
min-width: 50rem; min-width: 50rem;
max-width: 200rem; max-width: 80rem;
} }
.indexed-background { .indexed-background {
background-color: white; background-color: white;

View File

@ -19,7 +19,6 @@
import presentation from '..' import presentation from '..'
import { getFileUrl } from '../utils' import { getFileUrl } from '../utils'
import Download from './icons/Download.svelte' import Download from './icons/Download.svelte'
import IndexedDocumentPreview from './IndexedDocumentPreview.svelte'
export let file: string export let file: string
export let name: string export let name: string
@ -79,8 +78,6 @@
<img class="img-fit" src={getFileUrl(file)} alt="" /> <img class="img-fit" src={getFileUrl(file)} alt="" />
</div> </div>
<div class="space" /> <div class="space" />
{:else if contentType && contentType.startsWith('application/msword')}
<IndexedDocumentPreview objectId={value._id} noPanel />
{:else} {:else}
<iframe <iframe
class="pdfviewer-content" class="pdfviewer-content"

View File

@ -342,7 +342,9 @@ export class LiveQuery extends TxProcessor implements Client {
const pos = q.result.findIndex((p) => p._id === _id) const pos = q.result.findIndex((p) => p._id === _id)
if (pos !== -1) { if (pos !== -1) {
q.result.splice(pos, 1) q.result.splice(pos, 1)
q.total-- if (q.options?.total === true) {
q.total--
}
} }
} }
} else { } else {
@ -369,7 +371,9 @@ export class LiveQuery extends TxProcessor implements Client {
return true return true
} else { } else {
q.result.splice(pos, 1) q.result.splice(pos, 1)
q.total-- if (q.options?.total === true) {
q.total--
}
} }
} }
return false return false
@ -397,7 +401,9 @@ export class LiveQuery extends TxProcessor implements Client {
return true return true
} else { } else {
q.result.splice(pos, 1) q.result.splice(pos, 1)
q.total-- if (q.options?.total === true) {
q.total--
}
} }
} else { } else {
q.result[pos] = updatedDoc q.result[pos] = updatedDoc
@ -664,7 +670,9 @@ export class LiveQuery extends TxProcessor implements Client {
q.result[pos] = doc q.result[pos] = doc
} else { } else {
q.result.push(doc) q.result.push(doc)
q.total++ if (q.options?.total === true) {
q.total++
}
} }
return true return true
} }
@ -770,7 +778,9 @@ export class LiveQuery extends TxProcessor implements Client {
} }
q.result.push(doc) q.result.push(doc)
q.total++ if (q.options?.total === true) {
q.total++
}
if (q.options?.sort !== undefined) { if (q.options?.sort !== undefined) {
await resultSort(q.result, q.options?.sort, q._class, this.getHierarchy(), this.client.getModel()) await resultSort(q.result, q.options?.sort, q._class, this.getHierarchy(), this.client.getModel())
@ -874,7 +884,9 @@ export class LiveQuery extends TxProcessor implements Client {
const index = q.result.findIndex((p) => p._id === tx.objectId) const index = q.result.findIndex((p) => p._id === tx.objectId)
if (index > -1) { if (index > -1) {
q.result.splice(index, 1) q.result.splice(index, 1)
q.total-- if (q.options?.total === true) {
q.total--
}
await this.callback(q) await this.callback(q)
} }
await this.handleDocRemoveLookup(q, tx) await this.handleDocRemoveLookup(q, tx)

View File

@ -1,7 +1,6 @@
<script lang="ts"> <script lang="ts">
import { addEventListener, getMetadata, OK, PlatformEvent } from '@hcengineering/platform'
import { onDestroy } from 'svelte' import { onDestroy } from 'svelte'
import { getMetadata, OK } from '@hcengineering/platform'
import { PlatformEvent, addEventListener } from '@hcengineering/platform'
import type { AnyComponent } from '../../types' import type { AnyComponent } from '../../types'
// import { applicationShortcutKey } from '../../utils' // import { applicationShortcutKey } from '../../utils'
import { getCurrentLocation, location, navigate } from '../../location' import { getCurrentLocation, location, navigate } from '../../location'
@ -12,14 +11,14 @@
import StatusComponent from '../Status.svelte' import StatusComponent from '../Status.svelte'
import Clock from './Clock.svelte' import Clock from './Clock.svelte'
// import Mute from './icons/Mute.svelte' // import Mute from './icons/Mute.svelte'
import WiFi from './icons/WiFi.svelte' import { checkMobile, deviceOptionsStore as deviceInfo, networkStatus } from '../../'
import uiPlugin from '../../plugin'
import FontSizeSelector from './FontSizeSelector.svelte'
import Computer from './icons/Computer.svelte' import Computer from './icons/Computer.svelte'
import Phone from './icons/Phone.svelte' import Phone from './icons/Phone.svelte'
import ThemeSelector from './ThemeSelector.svelte' import WiFi from './icons/WiFi.svelte'
import FontSizeSelector from './FontSizeSelector.svelte'
import LangSelector from './LangSelector.svelte' import LangSelector from './LangSelector.svelte'
import uiPlugin from '../../plugin' import ThemeSelector from './ThemeSelector.svelte'
import { checkMobile, deviceOptionsStore as deviceInfo } from '../../'
let application: AnyComponent | undefined let application: AnyComponent | undefined
@ -130,8 +129,17 @@
size={'small'} size={'small'}
/> />
</div> </div>
<div class="flex-center widget cursor-pointer mr-3"> <!-- svelte-ignore a11y-click-events-have-key-events -->
<WiFi size={'small'} /> <div
class="flex-center widget cursor-pointer mr-3"
on:click={(evt) => {
getMetadata(uiPlugin.metadata.ShowNetwork)?.(evt)
}}
>
<WiFi
size={'small'}
fill={$networkStatus === -1 ? 'red' : $networkStatus % 2 === 1 ? 'blue' : 'currentColor'}
/>
</div> </div>
</div> </div>
</div> </div>

View File

@ -1,17 +1,20 @@
<script lang="ts"> <script lang="ts">
export let size: 'small' | 'medium' | 'large' export let size: 'small' | 'medium' | 'large'
const fill: string = 'currentColor' export let fill: string = 'currentColor'
</script> </script>
<svg class="svg-{size}" {fill} viewBox="0 0 24 24" xmlns="http://www.w3.org/2000/svg"> <svg class="svg-{size}" {fill} viewBox="0 0 24 24" xmlns="http://www.w3.org/2000/svg">
<g> <g stroke={fill}>
<path <path
stroke-width={fill !== 'currentColor' ? 2 : 1}
d="M12,10.9c-2.7,0-5.1,1.1-6.8,3.1C5,14.3,5,14.7,5.4,14.9s0.7,0.2,0.9-0.1c1.4-1.6,3.5-2.5,5.8-2.5s4.4,0.9,5.8,2.6 c0.1,0.1,0.3,0.2,0.6,0.2c0.1,0,0.3,0,0.5-0.1c0.3-0.2,0.3-0.7,0.1-0.9C17.2,11.9,14.7,10.9,12,10.9z" d="M12,10.9c-2.7,0-5.1,1.1-6.8,3.1C5,14.3,5,14.7,5.4,14.9s0.7,0.2,0.9-0.1c1.4-1.6,3.5-2.5,5.8-2.5s4.4,0.9,5.8,2.6 c0.1,0.1,0.3,0.2,0.6,0.2c0.1,0,0.3,0,0.5-0.1c0.3-0.2,0.3-0.7,0.1-0.9C17.2,11.9,14.7,10.9,12,10.9z"
/> />
<path <path
stroke-width={fill !== 'currentColor' ? 2 : 1}
d="M12,17.9c-1.1,0-2.3,0.5-2.9,1.1c-0.2,0.2-0.2,0.7,0,0.9c0.2,0.2,0.7,0.2,0.9,0c0.9-1,3.1-1,4.1,0c0.1,0.1,0.3,0.2,0.5,0.2 c0.2,0,0.3-0.1,0.5-0.2c0.2-0.2,0.2-0.7,0-0.9C14.3,18.4,13.1,17.9,12,17.9z" d="M12,17.9c-1.1,0-2.3,0.5-2.9,1.1c-0.2,0.2-0.2,0.7,0,0.9c0.2,0.2,0.7,0.2,0.9,0c0.9-1,3.1-1,4.1,0c0.1,0.1,0.3,0.2,0.5,0.2 c0.2,0,0.3-0.1,0.5-0.2c0.2-0.2,0.2-0.7,0-0.9C14.3,18.4,13.1,17.9,12,17.9z"
/> />
<path <path
stroke-width={fill !== 'currentColor' ? 2 : 1}
d="M23.9,9.7C20.8,6,16.5,3.8,12,3.8S3.2,6,0.2,9.7c-0.2,0.3-0.2,0.7,0.1,0.9c0.3,0.2,0.7,0.2,0.9-0.1 c2.7-3.4,6.7-5.3,10.7-5.3s8,1.9,10.7,5.4c0.1,0.1,0.3,0.2,0.6,0.2c0.1,0,0.3,0,0.5-0.1C24,10.5,24.1,10,23.9,9.7z" d="M23.9,9.7C20.8,6,16.5,3.8,12,3.8S3.2,6,0.2,9.7c-0.2,0.3-0.2,0.7,0.1,0.9c0.3,0.2,0.7,0.2,0.9-0.1 c2.7-3.4,6.7-5.3,10.7-5.3s8,1.9,10.7,5.4c0.1,0.1,0.3,0.2,0.6,0.2c0.1,0,0.3,0,0.5-0.1C24,10.5,24.1,10,23.9,9.7z"
/> />
</g> </g>

View File

@ -74,7 +74,10 @@ export const uis = plugin(uiId, {
}, },
metadata: { metadata: {
DefaultApplication: '' as Metadata<AnyComponent>, DefaultApplication: '' as Metadata<AnyComponent>,
Routes: '' as Metadata<Map<string, AnyComponent>> Routes: '' as Metadata<Map<string, AnyComponent>>,
// Will activate network click button
ShowNetwork: '' as Metadata<(evt: MouseEvent) => void>
} }
}) })

View File

@ -16,6 +16,7 @@
import { generateId } from '@hcengineering/core' import { generateId } from '@hcengineering/core'
import type { Metadata } from '@hcengineering/platform' import type { Metadata } from '@hcengineering/platform'
import { setMetadata } from '@hcengineering/platform' import { setMetadata } from '@hcengineering/platform'
import { writable } from 'svelte/store'
import { Notification, NotificationPosition, NotificationSeverity, notificationsStore } from '.' import { Notification, NotificationPosition, NotificationSeverity, notificationsStore } from '.'
import { AnyComponent, AnySvelteComponent } from './types' import { AnyComponent, AnySvelteComponent } from './types'
@ -99,3 +100,8 @@ export function tableToCSV (tableId: string, separator = ','): string {
} }
return csv.join('\n') return csv.join('\n')
} }
/**
* @public
*/
export const networkStatus = writable<number>(0)

View File

@ -16,14 +16,19 @@
}) })
const query = createQuery() const query = createQuery()
let done: number, total: number, item: TodoItem let done: number, total: number, item: TodoItem
$: query.query(task.class.TodoItem, { space: value.space, attachedTo: { $in: todoLists } }, (result) => { $: query.query(
total = result.total task.class.TodoItem,
done = result.filter((t) => t.done).length { space: value.space, attachedTo: { $in: todoLists } },
if (!total) return (result) => {
item = result.reduce((min, cur) => total = result.total
cur.dueTo === null ? min : min.dueTo === null || cur.dueTo < min.dueTo ? cur : min done = result.filter((t) => t.done).length
) if (!total) return
}) item = result.reduce((min, cur) =>
cur.dueTo === null ? min : min.dueTo === null || cur.dueTo < min.dueTo ? cur : min
)
},
{ total: true }
)
</script> </script>
{#if value && (total ?? 0) > 0} {#if value && (total ?? 0) > 0}

View File

@ -61,7 +61,7 @@ export function getDateIcon (item: TodoItem): 'normal' | 'warning' | 'overdue' {
export const commonBoardPreference = readable<CommonBoardPreference>(undefined, (set) => { export const commonBoardPreference = readable<CommonBoardPreference>(undefined, (set) => {
createQuery().query(board.class.CommonBoardPreference, { attachedTo: board.app.Board }, (result) => { createQuery().query(board.class.CommonBoardPreference, { attachedTo: board.app.Board }, (result) => {
if (result.total > 0) return set(result[0]) if (result.length > 0) return set(result[0])
void getClient().createDoc(board.class.CommonBoardPreference, preference.space.Preference, { void getClient().createDoc(board.class.CommonBoardPreference, preference.space.Preference, {
attachedTo: board.app.Board attachedTo: board.app.Board
}) })

View File

@ -67,7 +67,8 @@
}, },
{ {
limit: ATTACHEMNTS_LIMIT, limit: ATTACHEMNTS_LIMIT,
sort sort,
total: true
} }
) )
</script> </script>

View File

@ -79,7 +79,8 @@
lookup, lookup,
sort: { sort: {
createOn: SortingOrder.Descending createOn: SortingOrder.Descending
} },
total: true
} }
if (!showAll) { if (!showAll) {
options.limit = 4 options.limit = 4

View File

@ -37,6 +37,7 @@ import {
PlatformError, PlatformError,
ReqId, ReqId,
UNAUTHORIZED, UNAUTHORIZED,
broadcastEvent,
getMetadata, getMetadata,
readResponse, readResponse,
serialize, serialize,
@ -152,6 +153,8 @@ class Connection implements ClientConnection {
sockets = 0 sockets = 0
incomingTimer: any
private openConnection (): Promise<ClientSocket> { private openConnection (): Promise<ClientSocket> {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
// Use defined factory or browser default one. // Use defined factory or browser default one.
@ -196,6 +199,7 @@ class Connection implements ClientConnection {
} else { } else {
promise.resolve(resp.result) promise.resolve(resp.result)
} }
void broadcastEvent(client.event.NetworkRequests, this.requests.size)
} else { } else {
const tx = resp.result as Tx const tx = resp.result as Tx
if ( if (
@ -214,6 +218,13 @@ class Connection implements ClientConnection {
return return
} }
this.handler(tx) this.handler(tx)
clearTimeout(this.incomingTimer)
void broadcastEvent(client.event.NetworkRequests, this.requests.size + 1)
this.incomingTimer = setTimeout(() => {
void broadcastEvent(client.event.NetworkRequests, this.requests.size)
}, 500)
} }
} }
websocket.onclose = (ev) => { websocket.onclose = (ev) => {
@ -222,6 +233,7 @@ class Connection implements ClientConnection {
if (!(this.websocket instanceof Promise)) { if (!(this.websocket instanceof Promise)) {
this.websocket = null this.websocket = null
} }
void broadcastEvent(client.event.NetworkRequests, -1)
reject(new Error('websocket error')) reject(new Error('websocket error'))
} }
websocket.onopen = () => { websocket.onopen = () => {
@ -237,6 +249,7 @@ class Connection implements ClientConnection {
} }
websocket.onerror = (event: any) => { websocket.onerror = (event: any) => {
console.error('client websocket error:', socketId, event) console.error('client websocket error:', socketId, event)
void broadcastEvent(client.event.NetworkRequests, -1)
reject(new Error(`websocket error:${socketId}`)) reject(new Error(`websocket error:${socketId}`))
} }
}) })
@ -251,6 +264,7 @@ class Connection implements ClientConnection {
if (this.closed) { if (this.closed) {
throw new PlatformError(unknownError('connection closed')) throw new PlatformError(unknownError('connection closed'))
} }
const id = this.lastId++ const id = this.lastId++
const promise = new RequestPromise(data.method, data.params) const promise = new RequestPromise(data.method, data.params)
@ -280,6 +294,7 @@ class Connection implements ClientConnection {
}, 500) }, 500)
} }
await sendData() await sendData()
void broadcastEvent(client.event.NetworkRequests, this.requests.size)
return await promise.promise return await promise.promise
} }

View File

@ -80,5 +80,8 @@ export default plugin(clientId, {
}, },
function: { function: {
GetClient: '' as Resource<ClientFactory> GetClient: '' as Resource<ClientFactory>
},
event: {
NetworkRequests: '' as Metadata<string>
} }
}) })

View File

@ -13,7 +13,7 @@
// limitations under the License. // limitations under the License.
--> -->
<script lang="ts"> <script lang="ts">
import core, { Enum, EnumOf, Ref } from '@hcengineering/core' import core, { Enum, EnumOf, IndexKind, Ref } from '@hcengineering/core'
import { TypeEnum } from '@hcengineering/model' import { TypeEnum } from '@hcengineering/model'
import presentation, { getClient } from '@hcengineering/presentation' import presentation, { getClient } from '@hcengineering/presentation'
import { Button, Label, showPopup } from '@hcengineering/ui' import { Button, Label, showPopup } from '@hcengineering/ui'
@ -40,7 +40,7 @@
function changeEnum (value: Enum) { function changeEnum (value: Enum) {
type = TypeEnum(value._id) type = TypeEnum(value._id)
dispatch('change', { type, defaultValue }) dispatch('change', { type, defaultValue, index: IndexKind.FullText })
} }
async function updateSelected (ref: Ref<Enum> | undefined) { async function updateSelected (ref: Ref<Enum> | undefined) {

View File

@ -68,9 +68,6 @@
$: selected = !Array.isArray(value) ? ('attachedTo' in value ? value.attachedTo : undefined) : undefined $: selected = !Array.isArray(value) ? ('attachedTo' in value ? value.attachedTo : undefined) : undefined
$: ignoreObjects = !Array.isArray(value) ? ('_id' in value ? [value._id] : []) : undefined $: ignoreObjects = !Array.isArray(value) ? ('_id' in value ? [value._id] : []) : undefined
$: docQuery = { $: docQuery = {
'$lookup.status.category': {
$nin: [tracker.issueStatusCategory.Completed, tracker.issueStatusCategory.Canceled]
},
'parents.parentId': { 'parents.parentId': {
$nin: [ $nin: [
...new Set( ...new Set(

View File

@ -117,7 +117,7 @@
dispatch('content', objects) dispatch('content', objects)
loading = loading === 1 ? 0 : -1 loading = loading === 1 ? 0 : -1
}, },
{ sort, limit, ...options, lookup } { sort, limit, ...options, lookup, total: true }
) )
if (update && ++loading > 0) { if (update && ++loading > 0) {
objects = [] objects = []

View File

@ -0,0 +1,53 @@
<script lang="ts">
import { metricsToRows } from '@hcengineering/core'
import { getEmbeddedLabel } from '@hcengineering/platform'
import { Card } from '@hcengineering/presentation'
import { ticker } from '@hcengineering/ui'
import { onDestroy } from 'svelte'
export let endpoint: string
let data: any
onDestroy(
ticker.subscribe(() => {
fetch(endpoint, {
headers: {
'Content-Type': 'application/json'
}
}).then(async (json) => {
data = await json.json()
})
})
)
</script>
<Card on:close fullSize label={getEmbeddedLabel('Statistics')} okAction={() => {}} okLabel={getEmbeddedLabel('Ok')}>
{#if data}
{JSON.stringify(data.activeSessions, null, 2)}
<table class="antiTable" class:highlightRows={true}>
<thead class="scroller-thead">
<tr>
<th>Name</th>
<th>Average</th>
<th>Total</th>
<th>Ops</th>
</tr>
</thead>
<tbody>
{#each metricsToRows(data.metrics, 'System') as row}
<tr class="antiTable-body__row">
<td>
<span style={`padding-left: ${row[0]}rem;`}>
{row[1]}
</span>
</td>
<td>{row[2]}</td>
<td>{row[3]}</td>
<td>{row[4]}</td>
</tr>
{/each}
</tbody>
</table>
{/if}
</Card>

View File

@ -14,7 +14,7 @@
--> -->
<script lang="ts"> <script lang="ts">
import { getMetadata } from '@hcengineering/platform' import { getMetadata } from '@hcengineering/platform'
import { Component, Loading, location, Notifications } from '@hcengineering/ui' import { Component, Loading, Notifications, location } from '@hcengineering/ui'
import { connect, versionError } from '../connect' import { connect, versionError } from '../connect'
import { workbenchId } from '@hcengineering/workbench' import { workbenchId } from '@hcengineering/workbench'

View File

@ -1,16 +1,28 @@
import client from '@hcengineering/client' import client from '@hcengineering/client'
import contact from '@hcengineering/contact' import contact from '@hcengineering/contact'
import core, { Client, setCurrentAccount, Version, versionToString } from '@hcengineering/core' import core, { AccountRole, Client, setCurrentAccount, Version, versionToString } from '@hcengineering/core'
import login, { loginId } from '@hcengineering/login' import login, { loginId } from '@hcengineering/login'
import { getMetadata, getResource, setMetadata } from '@hcengineering/platform' import { addEventListener, getMetadata, getResource, setMetadata } from '@hcengineering/platform'
import presentation, { refreshClient, setClient } from '@hcengineering/presentation' import presentation, { refreshClient, setClient } from '@hcengineering/presentation'
import { fetchMetadataLocalStorage, getCurrentLocation, navigate, setMetadataLocalStorage } from '@hcengineering/ui' import ui, {
fetchMetadataLocalStorage,
getCurrentLocation,
navigate,
networkStatus,
setMetadataLocalStorage,
showPopup
} from '@hcengineering/ui'
import ServerStatistics from './components/ServerStatistics.svelte'
export let versionError: string | undefined = '' export let versionError: string | undefined = ''
let _token: string | undefined let _token: string | undefined
let _client: Client | undefined let _client: Client | undefined
addEventListener(client.event.NetworkRequests, async (event: string, val: number) => {
networkStatus.set(val)
})
export async function connect (title: string): Promise<Client | undefined> { export async function connect (title: string): Promise<Client | undefined> {
const loc = getCurrentLocation() const loc = getCurrentLocation()
const ws = loc.path[1] const ws = loc.path[1]
@ -105,13 +117,13 @@ export async function connect (title: string): Promise<Client | undefined> {
version = await _client.findOne<Version>(core.class.Version, {}) version = await _client.findOne<Version>(core.class.Version, {})
console.log('Model version', version) console.log('Model version', version)
const requirdVersion = getMetadata(presentation.metadata.RequiredVersion) const requiredVersion = getMetadata(presentation.metadata.RequiredVersion)
if (requirdVersion !== undefined && version !== undefined) { if (requiredVersion !== undefined && version !== undefined) {
console.log('checking min model version', requirdVersion) console.log('checking min model version', requiredVersion)
const versionStr = versionToString(version) const versionStr = versionToString(version)
if (version === undefined || requirdVersion !== versionStr) { if (version === undefined || requiredVersion !== versionStr) {
versionError = `${versionStr} => ${requirdVersion}` versionError = `${versionStr} => ${requiredVersion}`
return undefined return undefined
} }
} }
@ -129,6 +141,18 @@ export async function connect (title: string): Promise<Client | undefined> {
document.title = [ws, title].filter((it) => it).join(' - ') document.title = [ws, title].filter((it) => it).join(' - ')
await setClient(_client) await setClient(_client)
if (me.role === AccountRole.Owner) {
setMetadata(ui.metadata.ShowNetwork, (evt: MouseEvent) => {
showPopup(
ServerStatistics,
{
endpoint: endpoint.replace(/^ws/g, 'http') + '/' + token
},
'top'
)
})
}
return _client return _client
} }

View File

@ -4,7 +4,7 @@ import { writeFile } from 'fs/promises'
const metricsFile = process.env.METRICS_FILE const metricsFile = process.env.METRICS_FILE
const metricsConsole = (process.env.METRICS_CONSOLE ?? 'false') === 'true' const metricsConsole = (process.env.METRICS_CONSOLE ?? 'false') === 'true'
const METRICS_UPDATE_INTERVAL = 30000 const METRICS_UPDATE_INTERVAL = !metricsConsole ? 1000 : 30000
const metrics = newMetrics() const metrics = newMetrics()
export const metricsContext = new MeasureMetricsContext('System', {}, metrics) export const metricsContext = new MeasureMetricsContext('System', {}, metrics)
@ -14,7 +14,7 @@ if (metricsFile !== undefined || metricsConsole) {
let oldMetricsValue = '' let oldMetricsValue = ''
const intTimer = setInterval(() => { const intTimer = setInterval(() => {
const val = metricsToString(metrics) const val = metricsToString(metrics, 'System', 140)
if (val !== oldMetricsValue) { if (val !== oldMetricsValue) {
oldMetricsValue = val oldMetricsValue = val
if (metricsFile !== undefined) { if (metricsFile !== undefined) {

View File

@ -217,7 +217,7 @@ export function start (
const stages: FullTextPipelineStage[] = [] const stages: FullTextPipelineStage[] = []
// Add regular stage to for indexable fields change tracking. // Add regular stage to for indexable fields change tracking.
stages.push(new IndexedFieldStage(storage, fullText.newChild('fields', {}))) stages.push(new IndexedFieldStage(storage))
// Obtain text content from storage(like minio) and use content adapter to convert files to text content. // Obtain text content from storage(like minio) and use content adapter to convert files to text content.
stages.push(new ContentRetrievalStage(storageAdapter, workspace, fullText.newChild('content', {}), contentAdapter)) stages.push(new ContentRetrievalStage(storageAdapter, workspace, fullText.newChild('content', {}), contentAdapter))
@ -232,16 +232,16 @@ export function start (
// stages.push(retranslateStage) // stages.push(retranslateStage)
// Summary stage // Summary stage
const summaryStage = new FullSummaryStage() const summaryStage = new FullSummaryStage(storage)
stages.push(summaryStage) stages.push(summaryStage)
// Push all content to elastic search // Push all content to elastic search
const pushStage = new FullTextPushStage(adapter, workspace, fullText.newChild('push', {})) const pushStage = new FullTextPushStage(storage, adapter, workspace)
stages.push(pushStage) stages.push(pushStage)
// OpenAI prepare stage // OpenAI prepare stage
const openAIStage = new OpenAIEmbeddingsStage(adapter, fullText.newChild('embeddings', {}), workspace) const openAIStage = new OpenAIEmbeddingsStage(adapter, workspace)
// We depend on all available stages. // We depend on all available stages.
openAIStage.require = stages.map((it) => it.stageId) openAIStage.require = stages.map((it) => it.stageId)

View File

@ -301,7 +301,7 @@ export async function DMTrigger (tx: Tx, control: TriggerControl): Promise<Tx[]>
} }
const doc = TxProcessor.createDoc2Doc(actualTx as TxCreateDoc<ChunterMessage>) const doc = TxProcessor.createDoc2Doc(actualTx as TxCreateDoc<ChunterMessage>)
const dms = await control.findAll(chunter.class.DirectMessage, { _id: doc.space }) const dms = await control.findAll(chunter.class.DirectMessage, { _id: doc.space })
if (dms.total === 0) { if (dms.length === 0) {
return [] return []
} }
const sender = await getEmployeeAccountById(ctx.tx.modifiedBy, control) const sender = await getEmployeeAccountById(ctx.tx.modifiedBy, control)

View File

@ -102,7 +102,7 @@ export class OpenAIEmbeddingsStage implements FullTextPipelineStage {
async update (doc: DocIndexState, update: DocumentUpdate<DocIndexState>): Promise<void> {} async update (doc: DocIndexState, update: DocumentUpdate<DocIndexState>): Promise<void> {}
constructor (readonly adapter: FullTextAdapter, readonly metrics: MeasureContext, readonly workspaceId: WorkspaceId) {} constructor (readonly adapter: FullTextAdapter, readonly workspaceId: WorkspaceId) {}
updateSummary (summary: FullSummaryStage): void { updateSummary (summary: FullSummaryStage): void {
summary.fieldFilter.push((attr, value) => { summary.fieldFilter.push((attr, value) => {
@ -249,7 +249,6 @@ export class OpenAIEmbeddingsStage implements FullTextPipelineStage {
const queryString = query.$search.replace('\n ', ' ') const queryString = query.$search.replace('\n ', ' ')
const embeddingData = await this.getEmbedding(queryString) const embeddingData = await this.getEmbedding(queryString)
const embedding = embeddingData.data[0].embedding const embedding = embeddingData.data[0].embedding
console.log('search embedding', embedding)
const docs = await this.adapter.searchEmbedding(_classes, query, embedding, { const docs = await this.adapter.searchEmbedding(_classes, query, embedding, {
size, size,
from, from,
@ -265,7 +264,7 @@ export class OpenAIEmbeddingsStage implements FullTextPipelineStage {
} }
} }
async collect (toIndex: DocIndexState[], pipeline: FullTextPipeline): Promise<void> { async collect (toIndex: DocIndexState[], pipeline: FullTextPipeline, metrics: MeasureContext): Promise<void> {
if (!this.enabled) { if (!this.enabled) {
return return
} }
@ -273,12 +272,12 @@ export class OpenAIEmbeddingsStage implements FullTextPipelineStage {
if (pipeline.cancelling) { if (pipeline.cancelling) {
return return
} }
await this.limitter.add(() => this.collectDoc(doc, pipeline)) await this.limitter.add(() => this.collectDoc(doc, pipeline, metrics))
} }
await this.limitter.waitProcessing() await this.limitter.waitProcessing()
} }
async collectDoc (doc: DocIndexState, pipeline: FullTextPipeline): Promise<void> { async collectDoc (doc: DocIndexState, pipeline: FullTextPipeline, metrics: MeasureContext): Promise<void> {
if (pipeline.cancelling) { if (pipeline.cancelling) {
return return
} }
@ -311,11 +310,7 @@ export class OpenAIEmbeddingsStage implements FullTextPipelineStage {
let embeddingData: OpenAIEmbeddingResponse | undefined let embeddingData: OpenAIEmbeddingResponse | undefined
while (true) { while (true) {
try { try {
embeddingData = await this.metrics.with( embeddingData = await metrics.with('fetch-embeddings', {}, async () => await this.getEmbedding(embeddText))
'fetch-embeddings',
{},
async () => await this.getEmbedding(embeddText)
)
break break
} catch (err: any) { } catch (err: any) {
if (((err.message as string) ?? '').includes('connect ECONNREFUSED')) { if (((err.message as string) ?? '').includes('connect ECONNREFUSED')) {

View File

@ -5,4 +5,4 @@
"rootDir": "./src", "rootDir": "./src",
"outDir": "./lib" "outDir": "./lib"
} }
} }

View File

@ -22,6 +22,7 @@ import {
FindOptions, FindOptions,
FindResult, FindResult,
Hierarchy, Hierarchy,
IndexingConfiguration,
ModelDb, ModelDb,
Ref, Ref,
StorageIterator, StorageIterator,
@ -40,6 +41,9 @@ export interface DbAdapter {
* Method called after hierarchy is ready to use. * Method called after hierarchy is ready to use.
*/ */
init: (model: Tx[]) => Promise<void> init: (model: Tx[]) => Promise<void>
createIndexes: (domain: Domain, config: Pick<IndexingConfiguration<Doc>, 'indexes'>) => Promise<void>
close: () => Promise<void> close: () => Promise<void>
findAll: <T extends Doc>( findAll: <T extends Doc>(
_class: Ref<Class<T>>, _class: Ref<Class<T>>,
@ -97,6 +101,8 @@ export class DummyDbAdapter implements DbAdapter {
return toFindResult([]) return toFindResult([])
} }
async createIndexes (domain: Domain, config: Pick<IndexingConfiguration<Doc>, 'indexes'>): Promise<void> {}
async tx (...tx: Tx[]): Promise<TxResult> { async tx (...tx: Tx[]): Promise<TxResult> {
return {} return {}
} }

View File

@ -46,7 +46,6 @@ import type { FullTextAdapter, IndexedDoc, WithFind } from './types'
*/ */
export class FullTextIndex implements WithFind { export class FullTextIndex implements WithFind {
txFactory = new TxFactory(core.account.System, true) txFactory = new TxFactory(core.account.System, true)
consistency: Promise<void> | undefined consistency: Promise<void> | undefined
constructor ( constructor (
@ -59,6 +58,7 @@ export class FullTextIndex implements WithFind {
private readonly upgrade: boolean private readonly upgrade: boolean
) { ) {
if (!upgrade) { if (!upgrade) {
// Schedule indexing after consistency check
this.consistency = this.indexer.checkIndexConsistency(dbStorage) this.consistency = this.indexer.checkIndexConsistency(dbStorage)
// Schedule indexing after consistency check // Schedule indexing after consistency check
@ -120,7 +120,6 @@ export class FullTextIndex implements WithFind {
query: DocumentQuery<T>, query: DocumentQuery<T>,
options?: FindOptions<T> options?: FindOptions<T>
): Promise<FindResult<T>> { ): Promise<FindResult<T>> {
console.log('search', query)
const { _id, $search, ...mainQuery } = query const { _id, $search, ...mainQuery } = query
if ($search === undefined) return toFindResult([]) if ($search === undefined) return toFindResult([])
@ -143,7 +142,7 @@ export class FullTextIndex implements WithFind {
} }
} }
if (attr.type._class === core.class.Collection) { if (attr.type._class === core.class.Collection) {
// we need attached documents to be in clases // we need attached documents to be in classes
const dsc = this.hierarchy.getDescendants(attr.attributeOf) const dsc = this.hierarchy.getDescendants(attr.attributeOf)
classes = classes.concat(dsc) classes = classes.concat(dsc)
} }

View File

@ -24,19 +24,13 @@ import core, {
MeasureContext, MeasureContext,
Ref, Ref,
ServerStorage, ServerStorage,
Storage, Storage
WithLookup
} from '@hcengineering/core' } from '@hcengineering/core'
import { deepEqual } from 'fast-equals' import { deepEqual } from 'fast-equals'
import { IndexedDoc } from '../types' import { IndexedDoc } from '../types'
import { contentStageId, DocUpdateHandler, fieldStateId, FullTextPipeline, FullTextPipelineStage } from './types'
import { import {
contentStageId, collectPropagate,
DocUpdateHandler,
fieldStateId as fieldStageId,
FullTextPipeline,
FullTextPipelineStage
} from './types'
import {
docKey, docKey,
docUpdKey, docUpdKey,
getContent, getContent,
@ -51,7 +45,7 @@ import {
*/ */
export class IndexedFieldStage implements FullTextPipelineStage { export class IndexedFieldStage implements FullTextPipelineStage {
require = [] require = []
stageId = fieldStageId stageId = fieldStateId
// Do not clear downloaded content // Do not clear downloaded content
clearExcept: string[] = [contentStageId] clearExcept: string[] = [contentStageId]
@ -65,11 +59,11 @@ export class IndexedFieldStage implements FullTextPipelineStage {
indexState?: IndexStageState indexState?: IndexStageState
constructor (private readonly dbStorage: ServerStorage, readonly metrics: MeasureContext) {} constructor (private readonly dbStorage: ServerStorage) {}
async initialize (storage: Storage, pipeline: FullTextPipeline): Promise<void> { async initialize (storage: Storage, pipeline: FullTextPipeline): Promise<void> {
const indexable = ( const indexable = (
await pipeline.model.findAll(core.class.Class, { [core.mixin.FullTextSearchContext + '.propogate']: true }) await pipeline.model.findAll(core.class.Class, { [core.mixin.FullTextSearchContext + '.propagate']: true })
).map((it) => it._id) ).map((it) => it._id)
const forceIndexing = ( const forceIndexing = (
@ -92,7 +86,7 @@ export class IndexedFieldStage implements FullTextPipelineStage {
return { docs: [], pass: true } return { docs: [], pass: true }
} }
async collect (toIndex: DocIndexState[], pipeline: FullTextPipeline): Promise<void> { async collect (toIndex: DocIndexState[], pipeline: FullTextPipeline, metrics: MeasureContext): Promise<void> {
const byClass = toIndex.reduce<Record<Ref<Class<Doc>>, DocIndexState[]>>((p, c) => { const byClass = toIndex.reduce<Record<Ref<Class<Doc>>, DocIndexState[]>>((p, c) => {
p[c.objectClass] = [...(p[c.objectClass] ?? []), c] p[c.objectClass] = [...(p[c.objectClass] ?? []), c]
return p return p
@ -104,14 +98,14 @@ export class IndexedFieldStage implements FullTextPipelineStage {
// Obtain real documents // Obtain real documents
const valueIds = new Map(values.map((it) => [it._id, it])) const valueIds = new Map(values.map((it) => [it._id, it]))
const objClass = v as Ref<Class<Doc>> const objClass = v as Ref<Class<Doc>>
const docs = await this.dbStorage.findAll(this.metrics, objClass, { const docs = await this.dbStorage.findAll(metrics, objClass, {
_id: { $in: Array.from(valueIds.keys()) } _id: { $in: Array.from(valueIds.keys()) }
}) })
const attributes = getFullTextAttributes(pipeline.hierarchy, objClass) const attributes = getFullTextAttributes(pipeline.hierarchy, objClass)
// Child docs. // Child docs.
let allChilds: DocIndexState[] | undefined let allChildDocs: DocIndexState[] | undefined
for (const doc of docs) { for (const doc of docs) {
if (pipeline.cancelling) { if (pipeline.cancelling) {
@ -126,46 +120,53 @@ export class IndexedFieldStage implements FullTextPipelineStage {
const docUpdate: DocumentUpdate<DocIndexState> = {} const docUpdate: DocumentUpdate<DocIndexState> = {}
const chainDocUpdate: DocumentUpdate<DocIndexState> = {}
const chainDocAllUpdate: DocumentUpdate<DocIndexState> = {}
let changes = 0 let changes = 0
// Convert previous child fields to just
for (const [k] of Object.entries(docState.attributes)) {
const { attr, docId, _class } = extractDocKey(k)
if (
(docId !== undefined && attr !== '') ||
(_class !== undefined && !pipeline.hierarchy.isDerived(docState.objectClass, _class))
) {
// If it some previous indexed field.
// ;(docUpdate as any)[docUpdKey(k)] = null
;(docUpdate as any).$unset = { ...((docUpdate as any).$unset ?? {}), [docUpdKey(k)]: '' }
}
}
for (const [, v] of Object.entries(content)) { for (const [, v] of Object.entries(content)) {
// Check for content changes and collect update // Check for content changes and collect update
const dKey = docKey(v.attr.name, { _class: v.attr.attributeOf }) const dKey = docKey(v.attr.name, { _class: v.attr.attributeOf })
const dUKey = docUpdKey(v.attr.name, { _class: v.attr.attributeOf }) const dUKey = docUpdKey(v.attr.name, { _class: v.attr.attributeOf })
;(chainDocAllUpdate as any)[dUKey] = v.value // Full re-index in case stage value is changed
// Full reindex in case stage value is changed
if (!deepEqual(docState.attributes[dKey], v.value)) { if (!deepEqual(docState.attributes[dKey], v.value)) {
changes++ changes++
;(docUpdate as any)[dUKey] = v.value ;(docUpdate as any)[dUKey] = v.value
;(chainDocUpdate as any)[docUpdKey(v.attr.name, { _class: v.attr.attributeOf, docId: docState._id })] =
v.value
} }
} }
if (docState.attachedTo != null && changes > 0) { if (docState.attachedTo != null && changes > 0) {
// We need to clear field stage from parent, so it will be re indexed. const ctx = getFullTextContext(pipeline.hierarchy, objClass)
await pipeline.update(docState.attachedTo as Ref<DocIndexState>, false, chainDocUpdate) if (ctx.parentPropagate ?? true) {
// We need to clear field stage from parent, so it will be re indexed.
await pipeline.update(docState.attachedTo as Ref<DocIndexState>, false, {})
}
} }
const propogate: Ref<Class<Doc>>[] = this.collectPropogate(pipeline, docState, doc) const propagate: Ref<Class<Doc>>[] = collectPropagate(pipeline, docState.objectClass)
if (propogate.length > 0) { if (propagate.length > 0) {
// We need to propagate all changes to all childs of following clasess. // We need to propagate all changes to all child's of following classes.
if (allChilds === undefined) { if (allChildDocs === undefined) {
allChilds = await this.dbStorage.findAll( allChildDocs = await this.dbStorage.findAll(metrics.newChild('propagate', {}), core.class.DocIndexState, {
this.metrics.newChild('propogate', {}), attachedTo: { $in: docs.map((it) => it._id) }
core.class.DocIndexState, })
{ attachedTo: { $in: docs.map((it) => it._id) } }
)
} }
const childs = allChilds.filter((it) => it.attachedTo === docState._id) const childs = allChildDocs.filter((it) => it.attachedTo === docState._id)
for (const u of childs) { for (const u of childs) {
pipeline.add(u) if (propagate.some((it) => pipeline.hierarchy.isDerived(u.objectClass, it))) {
if (u.attributes.incremental === true) { pipeline.add(u)
await pipeline.update(u._id, false, chainDocUpdate) await pipeline.update(u._id, false, {})
} else {
await pipeline.update(u._id, false, { ...chainDocAllUpdate, [docUpdKey('incremental')]: true })
} }
} }
} }
@ -186,39 +187,6 @@ export class IndexedFieldStage implements FullTextPipelineStage {
} }
} }
private collectPropogate (
pipeline: FullTextPipeline,
docState: DocIndexState,
doc: WithLookup<Doc>
): Ref<Class<Doc>>[] {
const desc = new Set(pipeline.hierarchy.getDescendants(docState.objectClass))
let propogate: Ref<Class<Doc>>[] = []
const ftContext = getFullTextContext(pipeline.hierarchy, docState.objectClass)
if (ftContext.propogate !== undefined) {
propogate = [...ftContext.propogate]
}
// Add all parent mixins as well
for (const a of pipeline.hierarchy.getAncestors(docState.objectClass)) {
const dsca = pipeline.hierarchy.getDescendants(a)
for (const dd of dsca) {
if (pipeline.hierarchy.isMixin(dd)) {
desc.add(dd)
}
}
}
for (const d of desc) {
if (pipeline.hierarchy.isMixin(d) && pipeline.hierarchy.hasMixin(doc, d)) {
const mContext = getFullTextContext(pipeline.hierarchy, d)
if (mContext.propogate !== undefined) {
propogate = [...propogate, ...mContext.propogate]
}
}
}
return propogate
}
async remove (docs: DocIndexState[], pipeline: FullTextPipeline): Promise<void> { async remove (docs: DocIndexState[], pipeline: FullTextPipeline): Promise<void> {
for (const doc of docs) { for (const doc of docs) {
if (doc.attachedTo !== undefined) { if (doc.attachedTo !== undefined) {

View File

@ -13,7 +13,7 @@
// limitations under the License. // limitations under the License.
// //
import { import core, {
Class, Class,
Doc, Doc,
DocIndexState, DocIndexState,
@ -22,6 +22,7 @@ import {
extractDocKey, extractDocKey,
MeasureContext, MeasureContext,
Ref, Ref,
ServerStorage,
Storage, Storage,
WorkspaceId WorkspaceId
} from '@hcengineering/core' } from '@hcengineering/core'
@ -35,7 +36,7 @@ import {
FullTextPipelineStage, FullTextPipelineStage,
fullTextPushStageId fullTextPushStageId
} from './types' } from './types'
import { docKey } from './utils' import { collectPropagate, docKey, getFullTextContext } from './utils'
/** /**
* @public * @public
@ -57,9 +58,9 @@ export class FullTextPushStage implements FullTextPipelineStage {
stageValue: boolean | string = true stageValue: boolean | string = true
constructor ( constructor (
private readonly dbStorage: ServerStorage,
readonly fulltextAdapter: FullTextAdapter, readonly fulltextAdapter: FullTextAdapter,
readonly workspace: WorkspaceId, readonly workspace: WorkspaceId
readonly metrics: MeasureContext
) {} ) {}
async initialize (storage: Storage, pipeline: FullTextPipeline): Promise<void> { async initialize (storage: Storage, pipeline: FullTextPipeline): Promise<void> {
@ -94,41 +95,78 @@ export class FullTextPushStage implements FullTextPipelineStage {
return { docs: [], pass: true } return { docs: [], pass: true }
} }
async collect (toIndex: DocIndexState[], pipeline: FullTextPipeline): Promise<void> { async collect (toIndex: DocIndexState[], pipeline: FullTextPipeline, metrics: MeasureContext): Promise<void> {
const bulk: IndexedDoc[] = [] const bulk: IndexedDoc[] = []
for (const doc of toIndex) {
if (pipeline.cancelling) {
return
}
if (pipeline.cancelling) {
return
}
try { const part = [...toIndex]
while (part.length > 0) {
const toIndexPart = part.splice(0, 1000)
const allChildDocs = await this.dbStorage.findAll(metrics.newChild('find-child', {}), core.class.DocIndexState, {
attachedTo: { $in: toIndexPart.map((it) => it._id) }
})
for (const doc of toIndexPart) {
if (pipeline.cancelling) {
return
}
if (pipeline.cancelling) {
return
}
const elasticDoc = createElasticDoc(doc) const elasticDoc = createElasticDoc(doc)
updateDoc2Elastic(doc.attributes, elasticDoc) try {
this.checkIntegrity(elasticDoc) updateDoc2Elastic(doc.attributes, elasticDoc)
bulk.push(elasticDoc)
} catch (err: any) {
const wasError = (doc as any).error !== undefined
await pipeline.update(doc._id, false, { [docKey('error')]: JSON.stringify({ message: err.message, err }) }) // Include all child attributes
if (wasError) { const childDocs = allChildDocs.filter((it) => it.attachedTo === doc._id)
if (childDocs.length > 0) {
for (const c of childDocs) {
const ctx = getFullTextContext(pipeline.hierarchy, c.objectClass)
if (ctx.parentPropagate ?? true) {
updateDoc2Elastic(c.attributes, elasticDoc, c._id)
}
}
}
if (doc.attachedToClass != null && doc.attachedTo != null) {
const propagate: Ref<Class<Doc>>[] = collectPropagate(pipeline, doc.attachedToClass)
if (propagate.some((it) => pipeline.hierarchy.isDerived(doc.objectClass, it))) {
// We need to include all parent content into this one.
const [parentDoc] = await this.dbStorage.findAll(
metrics.newChild('propagate', {}),
core.class.DocIndexState,
{ _id: doc.attachedTo as Ref<DocIndexState> }
)
if (parentDoc !== undefined) {
updateDoc2Elastic(parentDoc.attributes, elasticDoc, parentDoc._id)
}
}
}
this.checkIntegrity(elasticDoc)
bulk.push(elasticDoc)
} catch (err: any) {
const wasError = (doc as any).error !== undefined
await pipeline.update(doc._id, false, { [docKey('error')]: JSON.stringify({ message: err.message, err }) })
if (wasError) {
continue
}
// Print error only first time, and update it in doc index
console.error(err)
continue continue
} }
// Print error only first time, and update it in doc index
console.error(err)
continue
} }
} // Perform bulk update to elastic
// Perform bulk update to elastic try {
try { await this.fulltextAdapter.updateMany(bulk)
await this.fulltextAdapter.updateMany(bulk) } catch (err: any) {
} catch (err: any) { console.error(err)
console.error(err) }
} for (const doc of toIndex) {
for (const doc of toIndex) { await pipeline.update(doc._id, true, {})
await pipeline.update(doc._id, true, {}) }
} }
} }
@ -156,14 +194,22 @@ export function createElasticDoc (upd: DocIndexState): IndexedDoc {
} }
return doc return doc
} }
function updateDoc2Elastic (attributes: Record<string, any>, doc: IndexedDoc): IndexedDoc { function updateDoc2Elastic (attributes: Record<string, any>, doc: IndexedDoc, docIdOverride?: Ref<DocIndexState>): void {
for (const [k, v] of Object.entries(attributes)) { for (const [k, v] of Object.entries(attributes)) {
const { _class, attr, docId, extra } = extractDocKey(k) if (v == null) {
continue
}
let { _class, attr, docId, extra } = extractDocKey(k)
if (attr.length === 0) {
continue
}
let vv: any = v let vv: any = v
if (extra.includes('base64')) { if (vv != null && extra.includes('base64')) {
vv = Buffer.from(v, 'base64').toString() vv = Buffer.from(v, 'base64').toString()
} }
docId = docIdOverride ?? docId
if (docId === undefined) { if (docId === undefined) {
doc[k] = vv doc[k] = vv
continue continue
@ -171,8 +217,10 @@ function updateDoc2Elastic (attributes: Record<string, any>, doc: IndexedDoc): I
const docIdAttr = '|' + docKey(attr, { _class, extra: extra.filter((it) => it !== 'base64') }) const docIdAttr = '|' + docKey(attr, { _class, extra: extra.filter((it) => it !== 'base64') })
if (vv !== null) { if (vv !== null) {
// Since we replace array of values, we could ignore null // Since we replace array of values, we could ignore null
doc[docIdAttr] = [...(doc[docIdAttr] ?? []), vv] doc[docIdAttr] = [...(doc[docIdAttr] ?? [])]
if (vv !== '') {
doc[docIdAttr].push(vv)
}
} }
} }
return doc
} }

View File

@ -16,20 +16,20 @@
import core, { import core, {
AttachedDoc, AttachedDoc,
Class, Class,
DOMAIN_DOC_INDEX_STATE,
Doc, Doc,
DocIndexState, DocIndexState,
DocumentQuery, DocumentQuery,
DocumentUpdate, DocumentUpdate,
DOMAIN_DOC_INDEX_STATE,
Hierarchy, Hierarchy,
MeasureContext, MeasureContext,
ModelDb, ModelDb,
Ref, Ref,
ServerStorage, ServerStorage,
setObjectValue,
TxFactory, TxFactory,
WorkspaceId, WorkspaceId,
_getOperator _getOperator,
setObjectValue
} from '@hcengineering/core' } from '@hcengineering/core'
import { DbAdapter } from '../adapter' import { DbAdapter } from '../adapter'
import type { IndexedDoc } from '../types' import type { IndexedDoc } from '../types'
@ -91,13 +91,13 @@ export class FullTextIndexPipeline implements FullTextPipeline {
} }
async cancel (): Promise<void> { async cancel (): Promise<void> {
console.log('Cancel indexing', this.indexId, this.workspace) console.log(this.workspace.name, 'Cancel indexing', this.indexId)
this.cancelling = true this.cancelling = true
clearTimeout(this.skippedReiterationTimeout) clearTimeout(this.skippedReiterationTimeout)
this.triggerIndexing() this.triggerIndexing()
await this.indexing await this.indexing
await this.flush(true) await this.flush(true)
console.log('Indexing canceled', this.indexId, this.workspace) console.log(this.workspace.name, 'Indexing canceled', this.indexId)
} }
async markRemove (doc: DocIndexState): Promise<void> { async markRemove (doc: DocIndexState): Promise<void> {
@ -130,7 +130,15 @@ export class FullTextIndexPipeline implements FullTextPipeline {
async flush (force = false): Promise<void> { async flush (force = false): Promise<void> {
if (this.pending.size > 0 && (this.pending.size >= 50 || force)) { if (this.pending.size > 0 && (this.pending.size >= 50 || force)) {
// Push all pending changes to storage. // Push all pending changes to storage.
await this.storage.update(DOMAIN_DOC_INDEX_STATE, this.pending) try {
await this.storage.update(DOMAIN_DOC_INDEX_STATE, this.pending)
} catch (err: any) {
console.error(err)
// Go one by one.
for (const o of this.pending) {
await this.storage.update(DOMAIN_DOC_INDEX_STATE, new Map([o]))
}
}
this.pending.clear() this.pending.clear()
} }
} }
@ -204,7 +212,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
if (udoc !== undefined && this.currentStage !== undefined) { if (udoc !== undefined && this.currentStage !== undefined) {
const stageId = this.currentStage.stageId const stageId = this.currentStage.stageId
// Update current stage, value // Update current stage, value
update.stages = { ...(udoc.stages ?? {}) } update.stages = this.filterCurrentStages(udoc)
update.stages[stageId] = mark update.stages[stageId] = mark
if (this.currentStage.clearExcept !== undefined) { if (this.currentStage.clearExcept !== undefined) {
@ -218,7 +226,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
// Filter unsupported stages // Filter unsupported stages
udoc.stages = update.stages udoc.stages = update.stages
this.stats[stageId] = (this.stats[stageId] ?? 0) + 1 this.currentStages[stageId] = (this.currentStages[stageId] ?? 0) + 1
this.stageChanged++ this.stageChanged++
} }
@ -234,7 +242,17 @@ export class FullTextIndexPipeline implements FullTextPipeline {
triggerIndexing = (): void => {} triggerIndexing = (): void => {}
skippedReiterationTimeout: any skippedReiterationTimeout: any
stats: Record<string, number> = {} currentStages: Record<string, number> = {}
private filterCurrentStages (udoc: DocIndexState): Record<string, string | boolean> {
const result: Record<string, string | boolean> = {}
for (const [k, v] of Object.entries(udoc.stages ?? {})) {
if (this.currentStages[k] !== undefined) {
result[k] = v
}
}
return result
}
private async stageUpdate (udoc: DocIndexState, update: DocumentUpdate<DocIndexState>): Promise<void> { private async stageUpdate (udoc: DocIndexState, update: DocumentUpdate<DocIndexState>): Promise<void> {
for (const u of this.currentStage?.updateFields ?? []) { for (const u of this.currentStage?.updateFields ?? []) {
@ -255,27 +273,56 @@ export class FullTextIndexPipeline implements FullTextPipeline {
broadcastClasses = new Set<Ref<Class<Doc>>>() broadcastClasses = new Set<Ref<Class<Doc>>>()
updateBroadcast: any = undefined updateBroadcast: any = undefined
indexesCreated = false
async doIndexing (): Promise<void> { async doIndexing (): Promise<void> {
// Check model is upgraded to support indexer. // Check model is upgraded to support indexer.
if (!this.indexesCreated) {
this.indexesCreated = true
// We need to be sure we have individual indexes per stage.
for (const st of this.stages) {
await this.storage.createIndexes(DOMAIN_DOC_INDEX_STATE, {
indexes: [
{
['stages.' + st.stageId]: 1
},
{
_class: 1,
_id: 1,
['stages.' + st.stageId]: 1,
removed: 1
}
]
})
}
}
try { try {
this.hierarchy.getClass(core.class.DocIndexState) this.hierarchy.getClass(core.class.DocIndexState)
} catch (err: any) { } catch (err: any) {
console.log('Models is not upgraded to support indexer', this.indexId, this.workspace) console.log(this.workspace.name, 'Models is not upgraded to support indexer', this.indexId)
return return
} }
await this.initStates() await this.metrics.with('init-states', {}, async () => await this.initStates())
while (!this.cancelling) {
await this.initializeStages() while (!this.cancelling) {
await this.processRemove() await this.metrics.with('initialize-stages', {}, async () => await this.initializeStages())
await this.metrics.with('process-remove', {}, async () => await this.processRemove())
const _classes = await rateLimitter.exec(() => {
return this.metrics.with('init-stages', {}, async () => await this.processIndex())
})
// Also update doc index state queries.
_classes.push(core.class.DocIndexState)
console.log('Indexing:', this.indexId, this.workspace)
const _classes = await rateLimitter.exec(() => this.processIndex())
_classes.forEach((it) => this.broadcastClasses.add(it)) _classes.forEach((it) => this.broadcastClasses.add(it))
if (this.toIndex.size === 0 || this.stageChanged === 0) { if (this.toIndex.size === 0 || this.stageChanged === 0) {
if (this.toIndex.size === 0) { if (this.toIndex.size === 0) {
console.log(`${this.workspace.name} Indexing complete`, this.indexId) console.log(this.workspace.name, 'Indexing complete', this.indexId)
} }
if (!this.cancelling) { if (!this.cancelling) {
// We need to send index update event // We need to send index update event
@ -300,7 +347,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
} }
} }
} }
console.log('Exit indexer', this.indexId, this.workspace) console.log(this.workspace.name, 'Exit indexer', this.indexId)
} }
private async processIndex (): Promise<Ref<Class<Doc>>[]> { private async processIndex (): Promise<Ref<Class<Doc>>[]> {
@ -316,30 +363,36 @@ export class FullTextIndexPipeline implements FullTextPipeline {
if (!st.enabled) { if (!st.enabled) {
break break
} }
await this.flush(true) await this.metrics.with('flush', {}, async () => await this.flush(true))
const toSkip = Array.from(this.skipped.entries()) const toSkip = Array.from(this.skipped.entries())
.filter((it) => it[1] > 3) .filter((it) => it[1] > 3)
.map((it) => it[0]) .map((it) => it[0])
const result = await this.storage.findAll( const result = await this.metrics.with(
core.class.DocIndexState, 'get-to-index',
{ {},
[`stages.${st.stageId}`]: { $nin: [st.stageValue] }, async () =>
_id: { $nin: toSkip }, await this.storage.findAll(
removed: false core.class.DocIndexState,
}, {
{ [`stages.${st.stageId}`]: { $ne: st.stageValue },
limit: globalIndexer.processingSize, _id: { $nin: toSkip },
sort: { removed: false
modifiedOn: 1 },
} {
} limit: globalIndexer.processingSize,
sort: {
_id: 1
}
}
)
) )
if (result.length > 0) { if (result.length > 0) {
console.log( console.log(
`Fulltext: Indexing ${this.indexId} ${this.workspace.name} ${st.stageId}`, this.workspace.name,
Object.entries(this.stats) `Full text: Indexing ${this.indexId} ${st.stageId}`,
Object.entries(this.currentStages)
.map((it) => `${it[0]}:${it[1]}`) .map((it) => `${it[0]}:${it[1]}`)
.join(' '), .join(' '),
result.total result.total
@ -359,16 +412,25 @@ export class FullTextIndexPipeline implements FullTextPipeline {
if (toIndex.length > 0) { if (toIndex.length > 0) {
// Do Indexing // Do Indexing
this.currentStage = st this.currentStage = st
await st.collect(toIndex, this)
await this.metrics.with(
'collect',
{ collector: st.stageId },
async (ctx) => await st.collect(toIndex, this, ctx)
)
toIndex.forEach((it) => _classUpdate.add(it.objectClass)) toIndex.forEach((it) => _classUpdate.add(it.objectClass))
// go with next stages if they accept it // go with next stages if they accept it
for (const nst of this.stages.slice(idx)) { for (const nst of this.stages.slice(idx)) {
const toIndex2 = this.matchStates(nst) const toIndex2 = this.matchStates(nst)
if (toIndex2.length > 0) { if (toIndex2.length > 0) {
this.currentStage = nst this.currentStage = nst
await nst.collect(toIndex2, this) await this.metrics.with(
'collect',
{ collector: nst.stageId },
async (ctx) => await nst.collect(toIndex2, this, ctx)
)
} }
} }
} else { } else {
@ -443,19 +505,9 @@ export class FullTextIndexPipeline implements FullTextPipeline {
} }
private async initStates (): Promise<void> { private async initStates (): Promise<void> {
const statistics = await this.storage.findAll(core.class.DocIndexState, {}, { projection: { stages: 1 } }) this.currentStages = {}
this.stats = {}
const allStageIds = new Set(this.stages.map((it) => it.stageId))
for (const st of this.stages) { for (const st of this.stages) {
this.stats[st.stageId] = 0 this.currentStages[st.stageId] = 0
}
for (const st of statistics) {
for (const [s, v] of Object.entries(st.stages ?? {})) {
if (v !== false && allStageIds.has(s)) {
this.stats[s] = (this.stats[s] ?? 0) + 1
}
}
} }
} }
@ -480,7 +532,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
} }
if (!isClassIndexable(this.hierarchy, c)) { if (!isClassIndexable(this.hierarchy, c)) {
// No need, since no indixable fields or attachments. // No need, since no indexable fields or attachments.
continue continue
} }
@ -490,6 +542,9 @@ export class FullTextIndexPipeline implements FullTextPipeline {
).map((it) => it._id) ).map((it) => it._id)
while (true) { while (true) {
if (this.cancelling) {
return
}
const newDocs: DocIndexState[] = ( const newDocs: DocIndexState[] = (
await dbStorage.findAll<Doc>( await dbStorage.findAll<Doc>(
this.metrics, this.metrics,
@ -528,7 +583,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
await this.storage.clean(DOMAIN_DOC_INDEX_STATE, docIds) await this.storage.clean(DOMAIN_DOC_INDEX_STATE, docIds)
} }
// Clean for non existing clases // Clean for non existing classes
const unknownClasses = ( const unknownClasses = (
await this.storage.findAll( await this.storage.findAll(

View File

@ -24,19 +24,21 @@ import core, {
Hierarchy, Hierarchy,
IndexStageState, IndexStageState,
isFullTextAttribute, isFullTextAttribute,
MeasureContext,
Ref, Ref,
ServerStorage,
Storage Storage
} from '@hcengineering/core' } from '@hcengineering/core'
import { translate } from '@hcengineering/platform' import { translate } from '@hcengineering/platform'
import { convert } from 'html-to-text' import { convert } from 'html-to-text'
import { IndexedDoc } from '../types' import { IndexedDoc } from '../types'
import { contentStageId, DocUpdateHandler, fieldStateId, FullTextPipeline, FullTextPipelineStage } from './types' import { contentStageId, DocUpdateHandler, fieldStateId, FullTextPipeline, FullTextPipelineStage } from './types'
import { getFullTextContext, loadIndexStageStage } from './utils' import { collectPropagate, getFullTextContext, loadIndexStageStage } from './utils'
/** /**
* @public * @public
*/ */
export const summaryStageId = 'sum-v3a' export const summaryStageId = 'sum-v4'
/** /**
* @public * @public
@ -60,6 +62,11 @@ export class FullSummaryStage implements FullTextPipelineStage {
indexState?: IndexStageState indexState?: IndexStageState
// Summary should be not a bigger what 1mb of data.
summaryLimit = 1024 * 1024
constructor (private readonly dbStorage: ServerStorage) {}
async initialize (storage: Storage, pipeline: FullTextPipeline): Promise<void> { async initialize (storage: Storage, pipeline: FullTextPipeline): Promise<void> {
const indexable = ( const indexable = (
await pipeline.model.findAll(core.class.Class, { [core.mixin.FullTextSearchContext + '.fullTextSummary']: true }) await pipeline.model.findAll(core.class.Class, { [core.mixin.FullTextSearchContext + '.fullTextSummary']: true })
@ -80,29 +87,79 @@ export class FullSummaryStage implements FullTextPipelineStage {
return { docs: [], pass: true } return { docs: [], pass: true }
} }
async collect (toIndex: DocIndexState[], pipeline: FullTextPipeline): Promise<void> { async collect (toIndex: DocIndexState[], pipeline: FullTextPipeline, metrics: MeasureContext): Promise<void> {
for (const doc of toIndex) { const part = [...toIndex]
if (pipeline.cancelling) { while (part.length > 0) {
return const toIndexPart = part.splice(0, 1000)
const allChildDocs = await this.dbStorage.findAll(
metrics.newChild('fulltext-find-child', {}),
core.class.DocIndexState,
{ attachedTo: { $in: toIndexPart.map((it) => it._id) } }
)
for (const doc of toIndexPart) {
if (pipeline.cancelling) {
return
}
const needIndex = isIndexingRequired(pipeline, doc)
// No need to index this class, mark embeddings as empty ones.
if (!needIndex) {
await pipeline.update(doc._id, this.stageValue, {})
continue
}
const update: DocumentUpdate<DocIndexState> = {}
let embeddingText = await extractIndexedValues(doc, pipeline.hierarchy, {
matchExtra: this.matchExtra,
fieldFilter: this.fieldFilter
})
// Include all child attributes
const childDocs = allChildDocs.filter((it) => it.attachedTo === doc._id)
if (childDocs.length > 0) {
for (const c of childDocs) {
const ctx = getFullTextContext(pipeline.hierarchy, c.objectClass)
if (ctx.parentPropagate ?? true) {
if (embeddingText.length > this.summaryLimit) {
break
}
embeddingText += await extractIndexedValues(c, pipeline.hierarchy, {
matchExtra: this.matchExtra,
fieldFilter: this.fieldFilter
})
}
}
}
if (doc.attachedToClass != null && doc.attachedTo != null) {
const propagate: Ref<Class<Doc>>[] = collectPropagate(pipeline, doc.attachedToClass)
if (propagate.some((it) => pipeline.hierarchy.isDerived(doc.objectClass, it))) {
// We need to include all parent content into this one.
const [parentDoc] = await this.dbStorage.findAll(
metrics.newChild('propagate', {}),
core.class.DocIndexState,
{ _id: doc.attachedTo as Ref<DocIndexState> }
)
if (parentDoc !== undefined) {
if (embeddingText.length > this.summaryLimit) {
break
}
embeddingText += await extractIndexedValues(parentDoc, pipeline.hierarchy, {
matchExtra: this.matchExtra,
fieldFilter: this.fieldFilter
})
}
}
}
update.fullSummary = embeddingText
await pipeline.update(doc._id, this.stageValue, update)
} }
const needIndex = isIndexingRequired(pipeline, doc)
// No need to index this class, mark embeddings as empty ones.
if (!needIndex) {
await pipeline.update(doc._id, this.stageValue, {})
continue
}
const update: DocumentUpdate<DocIndexState> = {}
const embeddingText = await extractIndexedValues(doc, pipeline.hierarchy, {
matchExtra: this.matchExtra,
fieldFilter: this.fieldFilter
})
update.fullSummary = embeddingText
await pipeline.update(doc._id, this.stageValue, update)
} }
} }
@ -137,8 +194,14 @@ export async function extractIndexedValues (
const currentReplacement: Record<string, string> = {} const currentReplacement: Record<string, string> = {}
for (const [k, v] of Object.entries(doc.attributes)) { for (const [k, v] of Object.entries(doc.attributes)) {
if (v == null) {
continue
}
try { try {
const { _class, attr, extra, docId } = extractDocKey(k) const { _class, attr, extra, docId } = extractDocKey(k)
if (docId !== undefined) {
continue
}
let sourceContent = `${v as string}`.trim() let sourceContent = `${v as string}`.trim()
if (extra.includes('base64')) { if (extra.includes('base64')) {
@ -181,7 +244,7 @@ export async function extractIndexedValues (
continue continue
} }
if (keyAttr.type._class === core.class.TypeAttachment && extra.length === 0) { if (keyAttr.type._class === core.class.TypeAttachment && extra.length === 0) {
// Skipt attachment id values. // Skip attachment id values.
continue continue
} }

View File

@ -20,6 +20,7 @@ import {
DocumentQuery, DocumentQuery,
DocumentUpdate, DocumentUpdate,
Hierarchy, Hierarchy,
MeasureContext,
ModelDb, ModelDb,
Ref, Ref,
Storage Storage
@ -70,7 +71,7 @@ export interface FullTextPipelineStage {
// If specified, will clear all stages except specified + current // If specified, will clear all stages except specified + current
clearExcept?: string[] clearExcept?: string[]
// Will propogate some changes for both mark values. // Will propagate some changes for both mark values.
updateFields: DocUpdateHandler[] updateFields: DocUpdateHandler[]
enabled: boolean enabled: boolean
@ -80,7 +81,7 @@ export interface FullTextPipelineStage {
initialize: (storage: Storage, pipeline: FullTextPipeline) => Promise<void> initialize: (storage: Storage, pipeline: FullTextPipeline) => Promise<void>
// Collect all changes related to bulk of document states // Collect all changes related to bulk of document states
collect: (docs: DocIndexState[], pipeline: FullTextPipeline) => Promise<void> collect: (docs: DocIndexState[], pipeline: FullTextPipeline, ctx: MeasureContext) => Promise<void>
// Handle remove of items. // Handle remove of items.
remove: (docs: DocIndexState[], pipeline: FullTextPipeline) => Promise<void> remove: (docs: DocIndexState[], pipeline: FullTextPipeline) => Promise<void>
@ -101,9 +102,9 @@ export const contentStageId = 'cnt-v2b'
/** /**
* @public * @public
*/ */
export const fieldStateId = 'fld-v3' export const fieldStateId = 'fld-v4'
/** /**
* @public * @public
*/ */
export const fullTextPushStageId = 'fts-v1' export const fullTextPushStageId = 'fts-v2'

View File

@ -41,6 +41,7 @@ import core, {
} from '@hcengineering/core' } from '@hcengineering/core'
import { deepEqual } from 'fast-equals' import { deepEqual } from 'fast-equals'
import plugin from '../plugin' import plugin from '../plugin'
import { FullTextPipeline } from './types'
/** /**
* @public * @public
*/ */
@ -234,6 +235,38 @@ export function getFullTextContext (
return { return {
fullTextSummary: false, fullTextSummary: false,
forceIndex: false, forceIndex: false,
propogate: [] propagate: []
} }
} }
/**
* @public
*/
export function collectPropagate (pipeline: FullTextPipeline, objectClass: Ref<Class<Doc>>): Ref<Class<Doc>>[] {
const desc = new Set(pipeline.hierarchy.getDescendants(objectClass))
const propagate = new Set<Ref<Class<Doc>>>()
const ftContext = getFullTextContext(pipeline.hierarchy, objectClass)
ftContext?.propagate?.forEach((it) => propagate.add(it))
// Add all parent mixins as well
for (const a of pipeline.hierarchy.getAncestors(objectClass)) {
const ftContext = getFullTextContext(pipeline.hierarchy, a)
ftContext?.propagate?.forEach((it) => propagate.add(it))
const dsca = pipeline.hierarchy.getDescendants(a)
for (const dd of dsca) {
if (pipeline.hierarchy.isMixin(dd)) {
desc.add(dd)
}
}
}
for (const d of desc) {
if (pipeline.hierarchy.isMixin(d)) {
const mContext = getFullTextContext(pipeline.hierarchy, d)
mContext?.propagate?.forEach((it) => propagate.add(it))
}
}
return Array.from(propagate.values())
}

View File

@ -327,7 +327,11 @@ class TServerStorage implements ServerStorage {
if (query?.$search !== undefined) { if (query?.$search !== undefined) {
return ctx.with('full-text-find-all', {}, (ctx) => this.fulltext.findAll(ctx, clazz, query, options)) return ctx.with('full-text-find-all', {}, (ctx) => this.fulltext.findAll(ctx, clazz, query, options))
} }
return ctx.with('db-find-all', { _class: clazz, domain }, () => const q: Record<string, any> = { _class: clazz }
for (const [k] of Object.entries(query)) {
q[k] = '...'
}
return ctx.with('db-find-all', { q: JSON.stringify(q) }, () =>
this.getAdapter(domain).findAll(clazz, query, options) this.getAdapter(domain).findAll(clazz, query, options)
) )
}) })
@ -831,6 +835,7 @@ export async function createServerStorage (
} }
const stages = conf.fulltextAdapter.stages(fulltextAdapter, storage, storageAdapter, contentAdapter) const stages = conf.fulltextAdapter.stages(fulltextAdapter, storage, storageAdapter, contentAdapter)
console.timeLog(conf.workspace.name, 'finish index pipeline stages') console.timeLog(conf.workspace.name, 'finish index pipeline stages')
const indexer = new FullTextIndexPipeline( const indexer = new FullTextIndexPipeline(
defaultAdapter, defaultAdapter,
stages, stages,

View File

@ -18,6 +18,7 @@ import {
Class, Class,
Doc, Doc,
DocumentQuery, DocumentQuery,
IndexingConfiguration,
MeasureContext, MeasureContext,
Ref, Ref,
toWorkspaceString, toWorkspaceString,
@ -27,6 +28,7 @@ import {
import type { EmbeddingSearchOption, FullTextAdapter, IndexedDoc } from '@hcengineering/server-core' import type { EmbeddingSearchOption, FullTextAdapter, IndexedDoc } from '@hcengineering/server-core'
import { Client, errors as esErr } from '@elastic/elasticsearch' import { Client, errors as esErr } from '@elastic/elasticsearch'
import { Domain } from 'node:domain'
class ElasticAdapter implements FullTextAdapter { class ElasticAdapter implements FullTextAdapter {
constructor ( constructor (
private readonly client: Client, private readonly client: Client,
@ -34,6 +36,8 @@ class ElasticAdapter implements FullTextAdapter {
private readonly _metrics: MeasureContext private readonly _metrics: MeasureContext
) {} ) {}
async createIndexes (domain: Domain, config: Pick<IndexingConfiguration<Doc>, 'indexes'>): Promise<void> {}
async initMapping (field?: { key: string, dims: number }): Promise<Record<string, number>> { async initMapping (field?: { key: string, dims: number }): Promise<Record<string, number>> {
// const current = await this.client.indices.getMapping({}) // const current = await this.client.indices.getMapping({})
// console.log('Mappings', current) // console.log('Mappings', current)

View File

@ -24,6 +24,7 @@ import core, {
FindResult, FindResult,
FullTextData, FullTextData,
Hierarchy, Hierarchy,
IndexingConfiguration,
Ref, Ref,
Space, Space,
StorageIterator, StorageIterator,
@ -53,6 +54,8 @@ class ElasticDataAdapter implements DbAdapter {
async init (model: Tx[]): Promise<void> {} async init (model: Tx[]): Promise<void> {}
async createIndexes (domain: Domain, config: Pick<IndexingConfiguration<Doc>, 'indexes'>): Promise<void> {}
async close (): Promise<void> { async close (): Promise<void> {
await this.client.close() await this.client.close()
} }

View File

@ -330,47 +330,52 @@ export function start (
} }
: {} : {}
https.get(url, options, (response) => { https
console.log('status', response.statusCode) .get(url, options, (response) => {
if (response.statusCode !== 200) { console.log('status', response.statusCode)
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions if (response.statusCode !== 200) {
res.status(500).send(`server returned ${response.statusCode}`) // eslint-disable-next-line @typescript-eslint/restrict-template-expressions
return res.status(500).send(`server returned ${response.statusCode}`)
} return
const id = uuid() }
const contentType = response.headers['content-type'] const id = uuid()
const meta = { const contentType = response.headers['content-type']
'Content-Type': contentType const meta = {
} 'Content-Type': contentType
const data: Buffer[] = [] }
response const data: Buffer[] = []
.on('data', function (chunk) { response
data.push(chunk) .on('data', function (chunk) {
}) data.push(chunk)
.on('end', function () { })
const buffer = Buffer.concat(data) .on('end', function () {
config.minio const buffer = Buffer.concat(data)
.put(payload.workspace, id, buffer, 0, meta) config.minio
.then(async (objInfo) => { .put(payload.workspace, id, buffer, 0, meta)
console.log('uploaded uuid', id, objInfo.etag) .then(async (objInfo) => {
console.log('uploaded uuid', id, objInfo.etag)
res.status(200).send({ res.status(200).send({
id, id,
contentType, contentType,
size: buffer.length size: buffer.length
})
}) })
}) .catch((err) => {
.catch((err) => { if (err !== null) {
if (err !== null) { console.log('minio putObject error', err)
console.log('minio putObject error', err) res.status(500).send(err)
res.status(500).send(err) }
} })
}) })
}) .on('error', function (err) {
.on('error', function (err) { res.status(500).send(err)
res.status(500).send(err) })
}) })
}) .on('error', (e) => {
console.error(e)
res.status(500).send(e)
})
} catch (error) { } catch (error) {
console.log(error) console.log(error)
res.status(500).send() res.status(500).send()

View File

@ -28,6 +28,7 @@ import core, {
FindOptions, FindOptions,
FindResult, FindResult,
Hierarchy, Hierarchy,
IndexingConfiguration,
isOperator, isOperator,
Lookup, Lookup,
Mixin, Mixin,
@ -97,6 +98,16 @@ abstract class MongoAdapterBase implements DbAdapter {
async init (): Promise<void> {} async init (): Promise<void> {}
async createIndexes (domain: Domain, config: Pick<IndexingConfiguration<Doc>, 'indexes'>): Promise<void> {
for (const vv of config.indexes) {
try {
await this.db.collection(domain).createIndex(vv)
} catch (err: any) {
console.error(err)
}
}
}
async tx (...tx: Tx[]): Promise<TxResult> { async tx (...tx: Tx[]): Promise<TxResult> {
return {} return {}
} }
@ -338,10 +349,10 @@ abstract class MongoAdapterBase implements DbAdapter {
private async fillSortPipeline<T extends Doc>( private async fillSortPipeline<T extends Doc>(
clazz: Ref<Class<T>>, clazz: Ref<Class<T>>,
options: FindOptions<T>, options: FindOptions<T> | undefined,
pipeline: any[] pipeline: any[]
): Promise<void> { ): Promise<void> {
if (options.sort !== undefined) { if (options?.sort !== undefined) {
const sort = {} as any const sort = {} as any
for (const _key in options.sort) { for (const _key in options.sort) {
const key = this.translateKey(_key, clazz) const key = this.translateKey(_key, clazz)
@ -369,12 +380,12 @@ abstract class MongoAdapterBase implements DbAdapter {
private async findWithPipeline<T extends Doc>( private async findWithPipeline<T extends Doc>(
clazz: Ref<Class<T>>, clazz: Ref<Class<T>>,
query: DocumentQuery<T>, query: DocumentQuery<T>,
options: FindOptions<T> options?: FindOptions<T>
): Promise<FindResult<T>> { ): Promise<FindResult<T>> {
const pipeline = [] const pipeline = []
const match = { $match: this.translateQuery(clazz, query) } const match = { $match: this.translateQuery(clazz, query) }
const slowPipeline = isLookupQuery(query) || isLookupSort(options.sort) const slowPipeline = isLookupQuery(query) || isLookupSort(options?.sort)
const steps = await this.getLookups(clazz, options.lookup) const steps = await this.getLookups(clazz, options?.lookup)
if (slowPipeline) { if (slowPipeline) {
for (const step of steps) { for (const step of steps) {
pipeline.push({ $lookup: step }) pipeline.push({ $lookup: step })
@ -383,7 +394,7 @@ abstract class MongoAdapterBase implements DbAdapter {
pipeline.push(match) pipeline.push(match)
const resultPipeline: any[] = [] const resultPipeline: any[] = []
await this.fillSortPipeline(clazz, options, pipeline) await this.fillSortPipeline(clazz, options, pipeline)
if (options.limit !== undefined) { if (options?.limit !== undefined) {
resultPipeline.push({ $limit: options.limit }) resultPipeline.push({ $limit: options.limit })
} }
if (!slowPipeline) { if (!slowPipeline) {
@ -402,11 +413,7 @@ abstract class MongoAdapterBase implements DbAdapter {
pipeline.push({ pipeline.push({
$facet: { $facet: {
results: resultPipeline, results: resultPipeline,
totalCount: [ ...(options?.total === true ? { totalCount: [{ $count: 'count' }] } : {})
{
$count: 'count'
}
]
} }
}) })
const domain = this.hierarchy.getDomain(clazz) const domain = this.hierarchy.getDomain(clazz)
@ -414,9 +421,9 @@ abstract class MongoAdapterBase implements DbAdapter {
cursor.maxTimeMS(30000) cursor.maxTimeMS(30000)
const res = (await cursor.toArray())[0] const res = (await cursor.toArray())[0]
const result = res.results as WithLookup<T>[] const result = res.results as WithLookup<T>[]
const total = res.totalCount?.shift()?.count const total = res.totalCount?.shift()?.count ?? -1
for (const row of result) { for (const row of result) {
await this.fillLookupValue(clazz, options.lookup, row) await this.fillLookupValue(clazz, options?.lookup, row)
this.clearExtraLookups(row) this.clearExtraLookups(row)
} }
return toFindResult(result, total) return toFindResult(result, total)
@ -525,7 +532,7 @@ abstract class MongoAdapterBase implements DbAdapter {
} }
cursor = cursor.project(projection) cursor = cursor.project(projection)
} }
let total: number | undefined let total: number = -1
if (options !== null && options !== undefined) { if (options !== null && options !== undefined) {
if (options.sort !== undefined) { if (options.sort !== undefined) {
const sort: Sort = {} const sort: Sort = {}
@ -537,7 +544,9 @@ abstract class MongoAdapterBase implements DbAdapter {
cursor = cursor.sort(sort) cursor = cursor.sort(sort)
} }
if (options.limit !== undefined) { if (options.limit !== undefined) {
total = await coll.countDocuments(mongoQuery) if (options.total === true) {
total = await coll.countDocuments(mongoQuery)
}
cursor = cursor.limit(options.limit) cursor = cursor.limit(options.limit)
} }
} }
@ -611,14 +620,18 @@ abstract class MongoAdapterBase implements DbAdapter {
if (ops.length > 0) { if (ops.length > 0) {
const part = ops.splice(0, 500) const part = ops.splice(0, 500)
await coll.bulkWrite( await coll.bulkWrite(
part.map((it) => ({ part.map((it) => {
updateOne: { const { $unset, ...set } = it[1] as any
filter: { _id: it[0] }, return {
update: { updateOne: {
$set: it[1] filter: { _id: it[0] },
update: {
$set: set,
...($unset !== undefined ? { $unset } : {})
}
} }
} }
})) })
) )
} }
} catch (err: any) { } catch (err: any) {

View File

@ -6,7 +6,7 @@ const apmUrl = process.env.APM_SERVER_URL
const metricsFile = process.env.METRICS_FILE const metricsFile = process.env.METRICS_FILE
const metricsConsole = (process.env.METRICS_CONSOLE ?? 'false') === 'true' const metricsConsole = (process.env.METRICS_CONSOLE ?? 'false') === 'true'
const METRICS_UPDATE_INTERVAL = 60000 const METRICS_UPDATE_INTERVAL = !metricsConsole ? 1000 : 60000
/** /**
* @public * @public
@ -31,7 +31,7 @@ export function getMetricsContext (): MeasureContext {
let oldMetricsValue = '' let oldMetricsValue = ''
const intTimer = setInterval(() => { const intTimer = setInterval(() => {
const val = metricsToString(metrics) const val = metricsToString(metrics, 'System', 140)
if (val !== oldMetricsValue) { if (val !== oldMetricsValue) {
oldMetricsValue = val oldMetricsValue = val
if (metricsFile !== undefined) { if (metricsFile !== undefined) {

View File

@ -23,6 +23,7 @@ import core, {
FindOptions, FindOptions,
FindResult, FindResult,
Hierarchy, Hierarchy,
IndexingConfiguration,
ModelDb, ModelDb,
Ref, Ref,
Space, Space,
@ -51,6 +52,8 @@ class MinioBlobAdapter implements DbAdapter {
async init (model: Tx[]): Promise<void> {} async init (model: Tx[]): Promise<void> {}
async createIndexes (domain: Domain, config: Pick<IndexingConfiguration<Doc>, 'indexes'>): Promise<void> {}
async close (): Promise<void> {} async close (): Promise<void> {}
find (domain: Domain): StorageIterator { find (domain: Domain): StorageIterator {

View File

@ -63,7 +63,7 @@ export class LibRetranslateStage implements TranslationStage {
indexState?: IndexStageState indexState?: IndexStageState
constructor (readonly metrics: MeasureContext, readonly workspaceId: WorkspaceId) {} constructor (readonly workspaceId: WorkspaceId) {}
async initialize (storage: Storage, pipeline: FullTextPipeline): Promise<void> { async initialize (storage: Storage, pipeline: FullTextPipeline): Promise<void> {
// Just do nothing // Just do nothing
@ -106,7 +106,7 @@ export class LibRetranslateStage implements TranslationStage {
} }
} }
async collect (toIndex: DocIndexState[], pipeline: FullTextPipeline): Promise<void> { async collect (toIndex: DocIndexState[], pipeline: FullTextPipeline, metrics: MeasureContext): Promise<void> {
if (!this.enabled) { if (!this.enabled) {
return return
} }

View File

@ -14,6 +14,7 @@
// //
import core, { import core, {
metricsAggregate,
generateId, generateId,
MeasureContext, MeasureContext,
Ref, Ref,
@ -26,7 +27,7 @@ import core, {
import { readRequest, Response, serialize, UNAUTHORIZED, unknownError } from '@hcengineering/platform' import { readRequest, Response, serialize, UNAUTHORIZED, unknownError } from '@hcengineering/platform'
import type { Pipeline, SessionContext } from '@hcengineering/server-core' import type { Pipeline, SessionContext } from '@hcengineering/server-core'
import { decodeToken, Token } from '@hcengineering/server-token' import { decodeToken, Token } from '@hcengineering/server-token'
import { createServer, IncomingMessage } from 'http' import { createServer, IncomingMessage, ServerResponse } from 'http'
import WebSocket, { RawData, WebSocketServer } from 'ws' import WebSocket, { RawData, WebSocketServer } from 'ws'
import { BroadcastCall, PipelineFactory, Session } from './types' import { BroadcastCall, PipelineFactory, Session } from './types'
@ -51,7 +52,7 @@ interface Workspace {
} }
class SessionManager { class SessionManager {
private readonly workspaces = new Map<string, Workspace>() readonly workspaces = new Map<string, Workspace>()
constructor (readonly sessionFactory: (token: Token, pipeline: Pipeline, broadcast: BroadcastCall) => Session) {} constructor (readonly sessionFactory: (token: Token, pipeline: Pipeline, broadcast: BroadcastCall) => Session) {}
@ -80,7 +81,7 @@ class SessionManager {
} }
if (token.extra?.model === 'upgrade') { if (token.extra?.model === 'upgrade') {
if (LOGGING_ENABLED) console.log('reloading workspace', JSON.stringify(token)) if (LOGGING_ENABLED) console.log(token.workspace.name, 'reloading workspace', JSON.stringify(token))
this.upgradeId = sessionId this.upgradeId = sessionId
// If upgrade client is used. // If upgrade client is used.
// Drop all existing clients // Drop all existing clients
@ -92,7 +93,7 @@ class SessionManager {
workspace.sessions = [] workspace.sessions = []
workspace.upgrade = token.extra?.model === 'upgrade' workspace.upgrade = token.extra?.model === 'upgrade'
} }
if (LOGGING_ENABLED) console.log('no sessions for workspace', wsString) if (LOGGING_ENABLED) console.log(token.workspace.name, 'no sessions for workspace', wsString)
// Re-create pipeline. // Re-create pipeline.
workspace.pipeline = pipelineFactory(ctx, token.workspace, true, (tx) => workspace.pipeline = pipelineFactory(ctx, token.workspace, true, (tx) =>
this.broadcastAll(workspace as Workspace, tx) this.broadcastAll(workspace as Workspace, tx)
@ -117,6 +118,7 @@ class SessionManager {
if (existingSession !== undefined) { if (existingSession !== undefined) {
if (LOGGING_ENABLED) { if (LOGGING_ENABLED) {
console.log( console.log(
token.workspace.name,
'found existing session', 'found existing session',
token.email, token.email,
existingSession[0].sessionId, existingSession[0].sessionId,
@ -198,7 +200,7 @@ class SessionManager {
code: number, code: number,
reason: string reason: string
): Promise<void> { ): Promise<void> {
if (LOGGING_ENABLED) console.log(`closing websocket, code: ${code}, reason: ${reason}`) if (LOGGING_ENABLED) console.log(workspaceId.name, `closing websocket, code: ${code}, reason: ${reason}`)
const wsid = toWorkspaceString(workspaceId) const wsid = toWorkspaceString(workspaceId)
const workspace = this.workspaces.get(wsid) const workspace = this.workspaces.get(wsid)
if (workspace === undefined) { if (workspace === undefined) {
@ -217,7 +219,7 @@ class SessionManager {
} }
if (workspace.sessions.length === 0) { if (workspace.sessions.length === 0) {
const wsUID = workspace.id const wsUID = workspace.id
if (LOGGING_ENABLED) console.log('no sessions for workspace', wsid, wsUID) if (LOGGING_ENABLED) console.log(workspaceId.name, 'no sessions for workspace', wsid, wsUID)
const waitAndClose = async (workspace: Workspace): Promise<void> => { const waitAndClose = async (workspace: Workspace): Promise<void> => {
try { try {
@ -231,7 +233,7 @@ class SessionManager {
if (LOGGING_ENABLED) console.timeLog(workspaceId.name, 'Closed workspace', wsUID) if (LOGGING_ENABLED) console.timeLog(workspaceId.name, 'Closed workspace', wsUID)
} catch (err: any) { } catch (err: any) {
this.workspaces.delete(wsid) this.workspaces.delete(wsid)
if (LOGGING_ENABLED) console.error(err) if (LOGGING_ENABLED) console.error(workspaceId.name, err)
} }
} }
workspace.closing = waitAndClose(workspace) workspace.closing = waitAndClose(workspace)
@ -305,7 +307,7 @@ class SessionManager {
console.error(new Error('internal: cannot find sessions')) console.error(new Error('internal: cannot find sessions'))
return return
} }
if (LOGGING_ENABLED) console.log(`server broadcasting to ${workspace.sessions.length} clients...`) if (LOGGING_ENABLED) console.log(workspaceId.name, `server broadcasting to ${workspace.sessions.length} clients...`)
const msg = serialize(resp) const msg = serialize(resp)
for (const session of workspace.sessions) { for (const session of workspace.sessions) {
if (session[0] !== from) { if (session[0] !== from) {
@ -336,7 +338,7 @@ async function handleRequest<S extends Session> (
ws.close(0, 'upgrade') ws.close(0, 'upgrade')
return return
} }
const userCtx = ctx.newChild(service.getUser(), { userId: service.getUser() }) as SessionContext const userCtx = ctx.newChild('client', { workspace }) as SessionContext
userCtx.sessionId = service.sessionInstanceId ?? '' userCtx.sessionId = service.sessionInstanceId ?? ''
const f = (service as any)[request.method] const f = (service as any)[request.method]
let timeout: any let timeout: any
@ -449,14 +451,19 @@ export function start (
} }
// remove session after 1seconds, give a time to reconnect. // remove session after 1seconds, give a time to reconnect.
if (code === 1000) { if (code === 1000) {
if (LOGGING_ENABLED) console.log(`client "${token.email}" closed normally`) if (LOGGING_ENABLED) console.log(token.workspace.name, `client "${token.email}" closed normally`)
void sessions.close(ctx, ws, token.workspace, code, reason.toString()) void sessions.close(ctx, ws, token.workspace, code, reason.toString())
} else { } else {
if (LOGGING_ENABLED) { if (LOGGING_ENABLED) {
console.log(`client "${token.email}" closed abnormally, waiting reconnect`, code, reason.toString()) console.log(
token.workspace.name,
`client "${token.email}" closed abnormally, waiting reconnect`,
code,
reason.toString()
)
} }
session.closeTimeout = setTimeout(() => { session.closeTimeout = setTimeout(() => {
if (LOGGING_ENABLED) console.log(`client "${token.email}" force closed`) if (LOGGING_ENABLED) console.log(token.workspace.name, `client "${token.email}" force closed`)
void sessions.close(ctx, ws, token.workspace, code, reason.toString()) void sessions.close(ctx, ws, token.workspace, code, reason.toString())
}, 10000) }, 10000)
} }
@ -469,6 +476,36 @@ export function start (
}) })
const server = createServer() const server = createServer()
server.on('request', (request: IncomingMessage, response: ServerResponse) => {
const url = new URL('http://localhost' + (request.url ?? ''))
const token = url.pathname.substring(1)
try {
const payload = decodeToken(token ?? '')
console.log(payload.workspace, 'statistics request')
response.writeHead(200, {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type'
})
const data = {
metrics: metricsAggregate((ctx as any).metrics),
activeSessions: {}
}
for (const [k, v] of sessions.workspaces) {
;(data.activeSessions as any)[k] = v.sessions.length
}
const json = JSON.stringify(data)
response.end(json)
} catch (err) {
response.writeHead(404, {})
response.end()
}
})
server.on('upgrade', (request: IncomingMessage, socket: any, head: Buffer) => { server.on('upgrade', (request: IncomingMessage, socket: any, head: Buffer) => {
const url = new URL('http://localhost' + (request.url ?? '')) const url = new URL('http://localhost' + (request.url ?? ''))
const token = url.pathname.substring(1) const token = url.pathname.substring(1)
@ -476,7 +513,7 @@ export function start (
try { try {
const payload = decodeToken(token ?? '') const payload = decodeToken(token ?? '')
const sessionId = url.searchParams.get('sessionId') const sessionId = url.searchParams.get('sessionId')
if (LOGGING_ENABLED) console.log('client connected with payload', payload, sessionId) if (LOGGING_ENABLED) console.log(payload.workspace.name, 'client connected with payload', payload, sessionId)
if (payload.workspace.productId !== productId) { if (payload.workspace.productId !== productId) {
throw new Error('Invalid workspace product') throw new Error('Invalid workspace product')