User online (#1491)

Signed-off-by: Denis Bykhov <80476319+BykhovDenis@users.noreply.github.com>
This commit is contained in:
Denis Bykhov 2022-04-23 09:45:55 +06:00 committed by GitHub
parent 53821621c6
commit 20e230dd27
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 569 additions and 120 deletions

View File

@ -14,10 +14,9 @@
//
import { Class, ClientConnection, Doc, DocumentQuery, FindOptions, FindResult, Ref, ServerStorage, Tx, TxHander, TxResult, DOMAIN_TX, MeasureMetricsContext } from '@anticrm/core'
import { createInMemoryAdapter, createInMemoryTxAdapter } from '@anticrm/dev-storage'
import { createInMemoryTxAdapter } from '@anticrm/dev-storage'
import { protoDeserialize, protoSerialize, setMetadata } from '@anticrm/platform'
import type { DbConfiguration } from '@anticrm/server-core'
import { createServerStorage, FullTextAdapter, IndexedDoc } from '@anticrm/server-core'
import { createInMemoryAdapter, createServerStorage, DbConfiguration, FullTextAdapter, IndexedDoc } from '@anticrm/server-core'
import devmodel from '@anticrm/devmodel'
class ServerStorageWrapper implements ClientConnection {

View File

@ -16,8 +16,8 @@
import type { Doc, Ref, TxResult } from '@anticrm/core'
import { DOMAIN_TX, MeasureMetricsContext } from '@anticrm/core'
import { createInMemoryAdapter, createInMemoryTxAdapter } from '@anticrm/dev-storage'
import { createPipeline, DbConfiguration, FullTextAdapter, IndexedDoc } from '@anticrm/server-core'
import { createInMemoryTxAdapter } from '@anticrm/dev-storage'
import { createInMemoryAdapter, createPipeline, DbConfiguration, FullTextAdapter, IndexedDoc } from '@anticrm/server-core'
import { start as startJsonRpc } from '@anticrm/server-ws'
class NullFullTextAdapter implements FullTextAdapter {

View File

@ -1,5 +1,5 @@
//
// Copyright © 2020 Anticrm Platform Contributors.
// Copyright © 2022 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
@ -13,11 +13,10 @@
// limitations under the License.
//
import type { Tx, Ref, Doc, Class, DocumentQuery, FindResult, FindOptions, TxResult } from '@anticrm/core'
import { ModelDb, TxDb, Hierarchy } from '@anticrm/core'
import type { DbAdapter, TxAdapter } from '@anticrm/server-core'
import type { Class, Doc, DocumentQuery, FindOptions, FindResult, Ref, Tx, TxResult } from '@anticrm/core'
import { Hierarchy, TxDb } from '@anticrm/core'
import builder from '@anticrm/model-all'
import type { TxAdapter } from '@anticrm/server-core'
class InMemoryTxAdapter implements TxAdapter {
private readonly txdb: TxDb
@ -47,40 +46,9 @@ class InMemoryTxAdapter implements TxAdapter {
async close (): Promise<void> {}
}
class InMemoryAdapter implements DbAdapter {
private readonly modeldb: ModelDb
constructor (hierarchy: Hierarchy) {
this.modeldb = new ModelDb(hierarchy)
}
async findAll<T extends Doc> (_class: Ref<Class<T>>, query: DocumentQuery<T>, options?: FindOptions<T>): Promise<FindResult<T>> {
return await this.modeldb.findAll(_class, query, options)
}
async tx (tx: Tx): Promise<TxResult> {
return await this.modeldb.tx(tx)
}
async init (model: Tx[]): Promise<void> {
for (const tx of model) {
await this.modeldb.tx(tx)
}
}
async close (): Promise<void> {}
}
/**
* @public
*/
export async function createInMemoryTxAdapter (hierarchy: Hierarchy, url: string, workspace: string): Promise<TxAdapter> {
return new InMemoryTxAdapter(hierarchy)
}
/**
* @public
*/
export async function createInMemoryAdapter (hierarchy: Hierarchy, url: string, db: string): Promise<DbAdapter> {
return new InMemoryAdapter(hierarchy)
}

View File

@ -33,6 +33,7 @@ import {
TVersion
} from './core'
import { TAccount, TSpace } from './security'
import { TUserStatus } from './transient'
import {
TTx,
TTxBulkWrite,
@ -83,6 +84,7 @@ export function createModel (builder: Builder): void {
TVersion,
TTypeNumber,
TTypeIntlString,
TPluginConfiguration
TPluginConfiguration,
TUserStatus
)
}

View File

@ -0,0 +1,24 @@
//
// Copyright © 2022 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { DOMAIN_TRANSIENT, UserStatus } from '@anticrm/core'
import { Model } from '@anticrm/model'
import core from './component'
import { TDoc } from './core'
@Model(core.class.UserStatus, core.class.Doc, DOMAIN_TRANSIENT)
export class TUserStatus extends TDoc implements UserStatus {
online!: boolean
}

View File

@ -227,6 +227,11 @@ export interface ArrOf<T extends PropertyType> extends Type<T[]> {
*/
export const DOMAIN_MODEL = 'model' as Domain
/**
* @public
*/
export const DOMAIN_TRANSIENT = 'transient' as Domain
// S P A C E
/**
@ -247,6 +252,13 @@ export interface Account extends Doc {
email: string
}
/**
* @public
*/
export interface UserStatus extends Doc {
online: boolean
}
/**
* @public
*/

View File

@ -15,7 +15,7 @@
import type { IntlString, Plugin, StatusCode } from '@anticrm/platform'
import { plugin } from '@anticrm/platform'
import { Mixin, Version } from '.'
import type { Account, AnyAttribute, ArrOf, AttachedDoc, Class, Collection, Doc, Interface, Obj, PluginConfiguration, PropertyType, Ref, RefTo, Space, Timestamp, Type } from './classes'
import type { Account, AnyAttribute, ArrOf, AttachedDoc, Class, Collection, Doc, Interface, Obj, PluginConfiguration, PropertyType, Ref, RefTo, Space, Timestamp, Type, UserStatus } from './classes'
import type { Tx, TxBulkWrite, TxCollectionCUD, TxCreateDoc, TxCUD, TxMixin, TxPutBag, TxRemoveDoc, TxUpdateDoc } from './tx'
/**
@ -55,7 +55,8 @@ export default plugin(coreId, {
Collection: '' as Ref<Class<Collection<AttachedDoc>>>,
Bag: '' as Ref<Class<Type<Record<string, PropertyType>>>>,
Version: '' as Ref<Class<Version>>,
PluginConfiguration: '' as Ref<Class<PluginConfiguration>>
PluginConfiguration: '' as Ref<Class<PluginConfiguration>>,
UserStatus: '' as Ref<Class<UserStatus>>
},
space: {
Tx: '' as Ref<Space>,

View File

@ -109,6 +109,10 @@ export async function CommentDelete (tx: Tx, control: TriggerControl): Promise<T
objectId: rmTx.objectId
}, { limit: 1 }))[0]
if (createTx === undefined) {
return []
}
const comment = TxProcessor.createDoc2Doc(createTx as TxCreateDoc<ThreadMessage>)
const comments = await control.findAll(chunter.class.ThreadMessage, {
@ -149,7 +153,7 @@ export async function MessageCreate (tx: Tx, control: TriggerControl): Promise<T
_id: message.space
}, { limit: 1 }))[0]
if (channel.lastMessage === undefined || channel.lastMessage < message.createOn) {
if (channel?.lastMessage === undefined || channel.lastMessage < message.createOn) {
const res = control.txFactory.createTxUpdateDoc<ChunterSpace>(channel._class, channel.space, channel._id, {
lastMessage: message.createOn
})
@ -173,13 +177,17 @@ export async function MessageDelete (tx: Tx, control: TriggerControl): Promise<T
objectId: rmTx.objectId
}, { limit: 1 }))[0]
if (createTx === undefined) {
return []
}
const message = TxProcessor.createDoc2Doc(createTx as TxCreateDoc<Message>)
const channel = (await control.findAll(chunter.class.ChunterSpace, {
_id: message.space
}, { limit: 1 }))[0]
if (channel.lastMessage === message.createOn) {
if (channel?.lastMessage === message.createOn) {
const messages = await control.findAll(chunter.class.Message, {
attachedTo: channel._id
})

View File

@ -109,7 +109,7 @@ export async function UpdateLastView (tx: Tx, control: TriggerControl): Promise<
case core.class.TxMixin: {
const tx = actualTx as TxCUD<Doc>
const doc = (await control.findAll(tx.objectClass, { _id: tx.objectId }, { limit: 1 }))[0]
if (!control.hierarchy.isDerived(doc._class, core.class.AttachedDoc)) {
if (doc !== undefined && !control.hierarchy.isDerived(doc._class, core.class.AttachedDoc)) {
const resTx = await getUpdateLastViewTx(control.findAll, doc._id, doc._class, tx.modifiedOn, tx.modifiedBy)
if (resTx !== undefined) {
result.push(resTx)

View File

@ -0,0 +1,91 @@
//
// Copyright © 2022 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import {
Class,
Doc,
DocumentQuery,
FindOptions,
FindResult,
Hierarchy,
ModelDb,
Ref,
Tx,
TxResult
} from '@anticrm/core'
/**
* @public
*/
export interface DbAdapter {
/**
* Method called after hierarchy is ready to use.
*/
init: (model: Tx[]) => Promise<void>
close: () => Promise<void>
findAll: <T extends Doc>(_class: Ref<Class<T>>, query: DocumentQuery<T>, options?: FindOptions<T>) => Promise<FindResult<T>>
tx: (tx: Tx) => Promise<TxResult>
}
/**
* @public
*/
export interface TxAdapter extends DbAdapter {
getModel: () => Promise<Tx[]>
}
/**
* @public
*/
export type DbAdapterFactory = (hierarchy: Hierarchy, url: string, db: string, modelDb: ModelDb) => Promise<DbAdapter>
/**
* @public
*/
export interface DbAdapterConfiguration {
factory: DbAdapterFactory
url: string
}
class InMemoryAdapter implements DbAdapter {
private readonly modeldb: ModelDb
constructor (hierarchy: Hierarchy) {
this.modeldb = new ModelDb(hierarchy)
}
async findAll<T extends Doc> (_class: Ref<Class<T>>, query: DocumentQuery<T>, options?: FindOptions<T>): Promise<FindResult<T>> {
return await this.modeldb.findAll(_class, query, options)
}
async tx (tx: Tx): Promise<TxResult> {
return await this.modeldb.tx(tx)
}
async init (model: Tx[]): Promise<void> {
for (const tx of model) {
await this.modeldb.tx(tx)
}
}
async close (): Promise<void> {}
}
/**
* @public
*/
export async function createInMemoryAdapter (hierarchy: Hierarchy, url: string, db: string): Promise<DbAdapter> {
return new InMemoryAdapter(hierarchy)
}

View File

@ -14,6 +14,7 @@
// limitations under the License.
//
export * from './adapter'
export * from './types'
export * from './fulltext'
export * from './storage'

View File

@ -13,9 +13,20 @@
// limitations under the License.
//
import { Class, Doc, DocumentQuery, FindOptions, FindResult, Ref, ServerStorage, Tx, TxResult } from '@anticrm/core'
import { Pipeline, Middleware, MiddlewareCreator, SessionContext } from './types'
import {
Class,
Doc,
DocumentQuery,
FindOptions,
FindResult,
ModelDb,
Ref,
ServerStorage,
Tx,
TxResult
} from '@anticrm/core'
import { createServerStorage, DbConfiguration } from './storage'
import { Middleware, MiddlewareCreator, Pipeline, SessionContext } from './types'
/**
* @public
@ -27,8 +38,10 @@ export async function createPipeline (conf: DbConfiguration, constructors: Middl
class TPipeline implements Pipeline {
private readonly head: Middleware | undefined
readonly modelDb: ModelDb
constructor (private readonly storage: ServerStorage, constructors: MiddlewareCreator[]) {
this.head = this.buildChain(constructors)
this.modelDb = storage.modelDb
}
private buildChain (constructors: MiddlewareCreator[]): Middleware | undefined {
@ -40,13 +53,14 @@ class TPipeline implements Pipeline {
return current
}
async findAll <T extends Doc>(
async findAll<T extends Doc>(
ctx: SessionContext,
_class: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: FindOptions<T>
): Promise<FindResult<T>> {
const [session, resClass, resQuery, resOptions] = this.head === undefined ? [ctx, _class, query, options] : await this.head.findAll(ctx, _class, query, options)
const [session, resClass, resQuery, resOptions] =
this.head === undefined ? [ctx, _class, query, options] : await this.head.findAll(ctx, _class, query, options)
return await this.storage.findAll(session, resClass, resQuery, resOptions)
}

View File

@ -43,44 +43,12 @@ import core, {
} from '@anticrm/core'
import { getResource } from '@anticrm/platform'
import type { Client as MinioClient } from 'minio'
import { DbAdapter, DbAdapterConfiguration, TxAdapter } from './adapter'
import { FullTextIndex } from './fulltext'
import serverCore from './plugin'
import { Triggers } from './triggers'
import type { FullTextAdapter, FullTextAdapterFactory, ObjectDDParticipant } from './types'
/**
* @public
*/
export interface DbAdapter {
/**
* Method called after hierarchy is ready to use.
*/
init: (model: Tx[]) => Promise<void>
close: () => Promise<void>
findAll: <T extends Doc>(_class: Ref<Class<T>>, query: DocumentQuery<T>, options?: FindOptions<T>) => Promise<FindResult<T>>
tx: (tx: Tx) => Promise<TxResult>
}
/**
* @public
*/
export interface TxAdapter extends DbAdapter {
getModel: () => Promise<Tx[]>
}
/**
* @public
*/
export type DbAdapterFactory = (hierarchy: Hierarchy, url: string, db: string, modelDb: ModelDb) => Promise<DbAdapter>
/**
* @public
*/
export interface DbAdapterConfiguration {
factory: DbAdapterFactory
url: string
}
/**
* @public
*/

View File

@ -13,7 +13,24 @@
// limitations under the License.
//
import type { Account, Class, Doc, DocumentQuery, FindOptions, FindResult, MeasureContext, ModelDb, Obj, Ref, ServerStorage, Space, Storage, Timestamp, Tx, TxResult } from '@anticrm/core'
import type {
Account,
Class,
Doc,
DocumentQuery,
FindOptions,
FindResult,
MeasureContext,
ModelDb,
Obj,
Ref,
ServerStorage,
Space,
Storage,
Timestamp,
Tx,
TxResult
} from '@anticrm/core'
import { Hierarchy, TxFactory } from '@anticrm/core'
import type { Resource } from '@anticrm/platform'
import type { Client as MinioClient } from 'minio'
@ -30,7 +47,12 @@ export interface SessionContext extends MeasureContext {
*/
export interface Middleware {
tx: (ctx: SessionContext, tx: Tx) => Promise<TxMiddlewareResult>
findAll: <T extends Doc>(ctx: SessionContext, _class: Ref<Class<T>>, query: DocumentQuery<T>, options?: FindOptions<T>) => Promise<FindAllMiddlewareResult<T>>
findAll: <T extends Doc>(
ctx: SessionContext,
_class: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: FindOptions<T>
) => Promise<FindAllMiddlewareResult<T>>
}
/**
@ -46,12 +68,18 @@ export type TxMiddlewareResult = [SessionContext, Tx, string | undefined]
/**
* @public
*/
export type FindAllMiddlewareResult<T extends Doc> = [SessionContext, Ref<Class<T>>, DocumentQuery<T>, FindOptions<T> | undefined]
export type FindAllMiddlewareResult<T extends Doc> = [
SessionContext,
Ref<Class<T>>,
DocumentQuery<T>,
FindOptions<T> | undefined
]
/**
* @public
*/
export interface Pipeline {
modelDb: ModelDb
findAll: <T extends Doc>(
ctx: SessionContext,
_class: Ref<Class<T>>,
@ -112,7 +140,12 @@ export interface FullTextAdapter {
index: (doc: IndexedDoc) => Promise<TxResult>
update: (id: Ref<Doc>, update: Record<string, any>) => Promise<TxResult>
remove: (id: Ref<Doc>) => Promise<void>
search: (_classes: Ref<Class<Doc>>[], search: DocumentQuery<Doc>, size: number | undefined, from?: number) => Promise<IndexedDoc[]>
search: (
_classes: Ref<Class<Doc>>[],
search: DocumentQuery<Doc>,
size: number | undefined,
from?: number
) => Promise<IndexedDoc[]>
close: () => Promise<void>
}
@ -125,7 +158,12 @@ export type FullTextAdapterFactory = (url: string, workspace: string) => Promise
* @public
*/
export interface WithFind {
findAll: <T extends Doc> (ctx: MeasureContext, clazz: Ref<Class<T>>, query: DocumentQuery<T>, options?: FindOptions<T>) => Promise<FindResult<T>>
findAll: <T extends Doc>(
ctx: MeasureContext,
clazz: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: FindOptions<T>
) => Promise<FindResult<T>>
}
/**
@ -134,5 +172,15 @@ export interface WithFind {
*/
export interface ObjectDDParticipant extends Class<Obj> {
// Collect more items to be deleted if parent document is deleted.
collectDocs: Resource<(doc: Doc, hiearachy: Hierarchy, findAll: <T extends Doc> (clazz: Ref<Class<T>>, query: DocumentQuery<T>, options?: FindOptions<T>) => Promise<FindResult<T>>) => Promise<Doc[]>>
collectDocs: Resource<
(
doc: Doc,
hiearachy: Hierarchy,
findAll: <T extends Doc>(
clazz: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: FindOptions<T>
) => Promise<FindResult<T>>
) => Promise<Doc[]>
>
}

View File

@ -18,6 +18,7 @@ import {
Doc,
DocumentQuery,
DOMAIN_MODEL,
DOMAIN_TRANSIENT,
DOMAIN_TX,
FindOptions,
FindResult,
@ -34,7 +35,7 @@ import { serverAttachmentId } from '@anticrm/server-attachment'
import { serverCalendarId } from '@anticrm/server-calendar'
import { serverChunterId } from '@anticrm/server-chunter'
import { serverContactId } from '@anticrm/server-contact'
import { createPipeline, DbAdapter, DbConfiguration, MiddlewareCreator } from '@anticrm/server-core'
import { createInMemoryAdapter, createPipeline, DbAdapter, DbConfiguration, MiddlewareCreator } from '@anticrm/server-core'
import { serverGmailId } from '@anticrm/server-gmail'
import { serverInventoryId } from '@anticrm/server-inventory'
import { serverLeadId } from '@anticrm/server-lead'
@ -113,6 +114,7 @@ export function start (
const conf: DbConfiguration = {
domains: {
[DOMAIN_TX]: 'MongoTx',
[DOMAIN_TRANSIENT]: 'InMemory',
[DOMAIN_MODEL]: 'Null'
},
defaultAdapter: 'Mongo',
@ -128,6 +130,10 @@ export function start (
Null: {
factory: createNullAdapter,
url: ''
},
InMemory: {
factory: createInMemoryAdapter,
url: ''
}
},
fulltextAdapter: {

View File

@ -0,0 +1,198 @@
//
// Copyright © 2020 Anticrm Platform Contributors.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import type { Account, Arr, Class, Data, Doc, Mixin, Obj, Ref, TxCreateDoc, TxCUD } from '@anticrm/core'
import core, { AttachedDoc, ClassifierKind, DOMAIN_MODEL, DOMAIN_TX, TxFactory } from '@anticrm/core'
import type { IntlString, Plugin } from '@anticrm/platform'
import { plugin } from '@anticrm/platform'
export const txFactory = new TxFactory(core.account.System)
export function createClass (_class: Ref<Class<Obj>>, attributes: Data<Class<Obj>>): TxCreateDoc<Doc> {
return txFactory.createTxCreateDoc(core.class.Class, core.space.Model, attributes, _class)
}
/**
* @public
*/
export function createDoc<T extends Doc> (
_class: Ref<Class<T>>,
attributes: Data<T>,
id?: Ref<T>,
modifiedBy?: Ref<Account>
): TxCreateDoc<Doc> {
const result = txFactory.createTxCreateDoc(_class, core.space.Model, attributes, id)
if (modifiedBy !== undefined) {
result.modifiedBy = modifiedBy
}
return result
}
/**
* @public
*/
export interface TestMixin extends Doc {
arr: Arr<string>
}
/**
* @public
*/
export interface AttachedComment extends AttachedDoc {
message: string
}
/**
* @public
*/
export const test = plugin('test' as Plugin, {
mixin: {
TestMixin: '' as Ref<Mixin<TestMixin>>
},
class: {
TestComment: '' as Ref<Class<AttachedComment>>
}
})
/**
* @public
* Generate minimal model for testing purposes.
* @returns R
*/
export function genMinModel (): TxCUD<Doc>[] {
const txes = []
// Fill Tx'es with basic model classes.
txes.push(createClass(core.class.Obj, { label: 'Obj' as IntlString, kind: ClassifierKind.CLASS }))
txes.push(
createClass(core.class.Doc, { label: 'Doc' as IntlString, extends: core.class.Obj, kind: ClassifierKind.CLASS })
)
txes.push(
createClass(core.class.AttachedDoc, {
label: 'AttachedDoc' as IntlString,
extends: core.class.Doc,
kind: ClassifierKind.MIXIN
})
)
txes.push(
createClass(core.class.Class, {
label: 'Class' as IntlString,
extends: core.class.Doc,
kind: ClassifierKind.CLASS,
domain: DOMAIN_MODEL
})
)
txes.push(
createClass(core.class.Space, {
label: 'Space' as IntlString,
extends: core.class.Doc,
kind: ClassifierKind.CLASS,
domain: DOMAIN_MODEL
})
)
txes.push(
createClass(core.class.Account, {
label: 'Account' as IntlString,
extends: core.class.Doc,
kind: ClassifierKind.CLASS,
domain: DOMAIN_MODEL
})
)
txes.push(
createClass(core.class.Tx, {
label: 'Tx' as IntlString,
extends: core.class.Doc,
kind: ClassifierKind.CLASS,
domain: DOMAIN_TX
})
)
txes.push(
createClass(core.class.TxCUD, {
label: 'TxCUD' as IntlString,
extends: core.class.Tx,
kind: ClassifierKind.CLASS,
domain: DOMAIN_TX
})
)
txes.push(
createClass(core.class.TxCreateDoc, {
label: 'TxCreateDoc' as IntlString,
extends: core.class.TxCUD,
kind: ClassifierKind.CLASS
})
)
txes.push(
createClass(core.class.TxUpdateDoc, {
label: 'TxUpdateDoc' as IntlString,
extends: core.class.TxCUD,
kind: ClassifierKind.CLASS
})
)
txes.push(
createClass(core.class.TxRemoveDoc, {
label: 'TxRemoveDoc' as IntlString,
extends: core.class.TxCUD,
kind: ClassifierKind.CLASS
})
)
txes.push(
createClass(core.class.TxCollectionCUD, {
label: 'TxCollectionCUD' as IntlString,
extends: core.class.TxCUD,
kind: ClassifierKind.CLASS
})
)
txes.push(
createClass(test.mixin.TestMixin, {
label: 'TestMixin' as IntlString,
extends: core.class.Doc,
kind: ClassifierKind.MIXIN
})
)
txes.push(
createClass(test.class.TestComment, {
label: 'TestComment' as IntlString,
extends: core.class.AttachedDoc,
kind: ClassifierKind.CLASS
})
)
const u1 = 'User1' as Ref<Account>
const u2 = 'User2' as Ref<Account>
txes.push(
createDoc(core.class.Account, { email: 'user1@site.com' }, u1),
createDoc(core.class.Account, { email: 'user2@site.com' }, u2),
createDoc(core.class.Space, {
name: 'Sp1',
description: '',
private: false,
archived: false,
members: [u1, u2]
})
)
txes.push(
createDoc(core.class.Space, {
name: 'Sp2',
description: '',
private: false,
archived: false,
members: [u1]
})
)
return txes
}

View File

@ -19,7 +19,7 @@ import { start, disableLogging } from '../server'
import { generateToken } from '@anticrm/server-token'
import WebSocket from 'ws'
import type {
import {
Doc,
Ref,
Class,
@ -27,17 +27,35 @@ import type {
FindOptions,
FindResult,
Tx,
TxResult
TxResult,
ModelDb,
MeasureMetricsContext,
toFindResult,
Hierarchy
} from '@anticrm/core'
import { MeasureMetricsContext, toFindResult } from '@anticrm/core'
import { SessionContext } from '@anticrm/server-core'
import { genMinModel } from './minmodel'
describe('server', () => {
disableLogging()
async function getModelDb (): Promise<ModelDb> {
const txes = genMinModel()
const hierarchy = new Hierarchy()
for (const tx of txes) {
hierarchy.tx(tx)
}
const modelDb = new ModelDb(hierarchy)
for (const tx of txes) {
await modelDb.tx(tx)
}
return modelDb
}
start(
new MeasureMetricsContext('test', {}),
async () => ({
modelDb: await getModelDb(),
findAll: async <T extends Doc>(
ctx: SessionContext,
_class: Ref<Class<T>>,

View File

@ -13,7 +13,19 @@
// limitations under the License.
//
import { Class, Doc, DocumentQuery, FindOptions, FindResult, MeasureContext, Ref, Tx, TxResult } from '@anticrm/core'
import core, {
Class,
Doc,
DocumentQuery,
FindOptions,
FindResult,
MeasureContext,
ModelDb,
Ref,
Space,
Tx, TxFactory,
TxResult
} from '@anticrm/core'
import { readRequest, Response, serialize, unknownError } from '@anticrm/platform'
import type { Pipeline, SessionContext } from '@anticrm/server-core'
import { decodeToken, Token } from '@anticrm/server-token'
@ -22,22 +34,36 @@ import WebSocket, { Server } from 'ws'
let LOGGING_ENABLED = true
export function disableLogging (): void { LOGGING_ENABLED = false }
export function disableLogging (): void {
LOGGING_ENABLED = false
}
class Session {
readonly modelDb: ModelDb
constructor (
private readonly manager: SessionManager,
private readonly token: Token,
private readonly pipeline: Pipeline
) {}
) {
this.modelDb = pipeline.modelDb
}
getUser (): string {
return this.token.email
}
async ping (): Promise<string> { console.log('ping'); return 'pong!' }
async ping (): Promise<string> {
console.log('ping')
return 'pong!'
}
async findAll <T extends Doc>(ctx: MeasureContext, _class: Ref<Class<T>>, query: DocumentQuery<T>, options?: FindOptions<T>): Promise<FindResult<T>> {
async findAll<T extends Doc>(
ctx: MeasureContext,
_class: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: FindOptions<T>
): Promise<FindResult<T>> {
const context = ctx as SessionContext
context.userEmail = this.token.email
return await this.pipeline.findAll(context, _class, query, options)
@ -48,9 +74,9 @@ class Session {
context.userEmail = this.token.email
const [result, derived, target] = await this.pipeline.tx(context, tx)
this.manager.broadcast(this, this.token, { result: tx }, target)
this.manager.broadcast(this, this.token.workspace, { result: tx }, target)
for (const dtx of derived) {
this.manager.broadcast(null, this.token, { result: dtx }, target)
this.manager.broadcast(null, this.token.workspace, { result: dtx }, target)
}
return result
}
@ -64,10 +90,15 @@ interface Workspace {
class SessionManager {
private readonly workspaces = new Map<string, Workspace>()
async addSession (ws: WebSocket, token: Token, pipelineFactory: (ws: string) => Promise<Pipeline>): Promise<Session> {
async addSession (
ctx: MeasureContext,
ws: WebSocket,
token: Token,
pipelineFactory: (ws: string) => Promise<Pipeline>
): Promise<Session> {
const workspace = this.workspaces.get(token.workspace)
if (workspace === undefined) {
return await this.createWorkspace(pipelineFactory, token, ws)
return await this.createWorkspace(ctx, pipelineFactory, token, ws)
} else {
if (token.extra?.model === 'reload') {
console.log('reloading workspace', JSON.stringify(token))
@ -75,19 +106,56 @@ class SessionManager {
// Drop all existing clients
if (workspace.sessions.length > 0) {
for (const s of workspace.sessions) {
this.close(s[1], token.workspace, 0, 'upgrade')
await this.close(ctx, s[1], token.workspace, 0, 'upgrade')
}
}
return await this.createWorkspace(pipelineFactory, token, ws)
return await this.createWorkspace(ctx, pipelineFactory, token, ws)
}
const session = new Session(this, token, workspace.pipeline)
workspace.sessions.push([session, ws])
await this.setStatus(ctx, session, true)
return session
}
}
private async createWorkspace (pipelineFactory: (ws: string) => Promise<Pipeline>, token: Token, ws: WebSocket): Promise<Session> {
private async setStatus (ctx: MeasureContext, session: Session, online: boolean): Promise<void> {
try {
const user = (
await session.modelDb.findAll(
core.class.Account,
{
email: session.getUser()
},
{ limit: 1 }
)
)[0]
if (user === undefined) return
const status = (await session.findAll(ctx, core.class.UserStatus, { modifiedBy: user._id }, { limit: 1 }))[0]
const txFactory = new TxFactory(user._id)
if (status === undefined) {
const tx = txFactory.createTxCreateDoc(core.class.UserStatus, user._id as string as Ref<Space>, {
online
})
tx.space = core.space.DerivedTx
await session.tx(ctx, tx)
} else if (status.online !== online) {
const tx = txFactory.createTxUpdateDoc(status._class, status.space, status._id, {
online
})
tx.space = core.space.DerivedTx
await session.tx(ctx, tx)
}
} catch {
}
}
private async createWorkspace (
ctx: MeasureContext,
pipelineFactory: (ws: string) => Promise<Pipeline>,
token: Token,
ws: WebSocket
): Promise<Session> {
const pipeline = await pipelineFactory(token.workspace)
const session = new Session(this, token, pipeline)
const workspace: Workspace = {
@ -95,26 +163,36 @@ class SessionManager {
sessions: [[session, ws]]
}
this.workspaces.set(token.workspace, workspace)
await this.setStatus(ctx, session, true)
return session
}
close (ws: WebSocket, workspaceId: string, code: number, reason: string): void {
async close (ctx: MeasureContext, ws: WebSocket, workspaceId: string, code: number, reason: string): Promise<void> {
if (LOGGING_ENABLED) console.log(`closing websocket, code: ${code}, reason: ${reason}`)
const workspace = this.workspaces.get(workspaceId)
if (workspace === undefined) {
console.error(new Error('internal: cannot find sessions'))
return
}
workspace.sessions = workspace.sessions.filter(session => session[1] !== ws)
if (workspace.sessions.length === 0) {
if (LOGGING_ENABLED) console.log('no sessions for workspace', workspaceId)
this.workspaces.delete(workspaceId)
workspace.pipeline.close().catch(err => console.error(err))
const index = workspace.sessions.findIndex((p) => p[1] === ws)
if (index !== -1) {
const session = workspace.sessions[index]
workspace.sessions.splice(index, 1)
const user = session[0].getUser()
const another = workspace.sessions.findIndex((p) => p[0].getUser() === user)
if (another === -1) {
await this.setStatus(ctx, session[0], false)
}
if (workspace.sessions.length === 0) {
if (LOGGING_ENABLED) console.log('no sessions for workspace', workspaceId)
this.workspaces.delete(workspaceId)
workspace.pipeline.close().catch((err) => console.error(err))
}
}
}
broadcast (from: Session | null, token: Token, resp: Response<any>, target?: string): void {
const workspace = this.workspaces.get(token.workspace)
broadcast (from: Session | null, workspaceId: string, resp: Response<any>, target?: string): void {
const workspace = this.workspaces.get(workspaceId)
if (workspace === undefined) {
console.error(new Error('internal: cannot find sessions'))
return
@ -133,7 +211,12 @@ class SessionManager {
}
}
async function handleRequest<S extends Session> (ctx: MeasureContext, service: S, ws: WebSocket, msg: string): Promise<void> {
async function handleRequest<S extends Session> (
ctx: MeasureContext,
service: S,
ws: WebSocket,
msg: string
): Promise<void> {
const request = readRequest(msg)
const f = (service as any)[request.method]
try {
@ -156,7 +239,12 @@ async function handleRequest<S extends Session> (ctx: MeasureContext, service: S
* @param port -
* @param host -
*/
export function start (ctx: MeasureContext, pipelineFactory: (workspace: string) => Promise<Pipeline>, port: number, host?: string): () => void {
export function start (
ctx: MeasureContext,
pipelineFactory: (workspace: string) => Promise<Pipeline>,
port: number,
host?: string
): () => void {
console.log(`starting server on port ${port} ...`)
const sessions = new SessionManager()
@ -166,11 +254,14 @@ export function start (ctx: MeasureContext, pipelineFactory: (workspace: string)
wss.on('connection', async (ws: WebSocket, request: any, token: Token) => {
const buffer: string[] = []
ws.on('message', (msg: string) => { buffer.push(msg) })
const session = await sessions.addSession(ws, token, pipelineFactory)
ws.on('message', (msg: string) => {
buffer.push(msg)
})
const session = await sessions.addSession(ctx, ws, token, pipelineFactory)
// eslint-disable-next-line @typescript-eslint/no-misused-promises
ws.on('message', async (msg: string) => await handleRequest(ctx, session, ws, msg))
ws.on('close', (code: number, reason: string) => sessions.close(ws, token.workspace, code, reason))
// eslint-disable-next-line @typescript-eslint/no-misused-promises
ws.on('close', async (code: number, reason: string) => await sessions.close(ctx, ws, token.workspace, code, reason))
for (const msg of buffer) {
await handleRequest(ctx, session, ws, msg)
@ -183,7 +274,7 @@ export function start (ctx: MeasureContext, pipelineFactory: (workspace: string)
try {
const payload = decodeToken(token ?? '')
console.log('client connected with payload', payload)
wss.handleUpgrade(request, socket, head, ws => wss.emit('connection', ws, request, payload))
wss.handleUpgrade(request, socket, head, (ws) => wss.emit('connection', ws, request, payload))
} catch (err) {
console.log('unauthorized client')
socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n')