UBERF-7684: Workspace service (#6460)

Signed-off-by: Alexey Zinoviev <alexey.zinoviev@xored.com>
This commit is contained in:
Alexey Zinoviev 2024-09-02 20:12:04 +04:00 committed by GitHub
parent 069c69b7db
commit db94cab1e0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
52 changed files with 2199 additions and 1080 deletions

34
.vscode/launch.json vendored
View File

@ -48,7 +48,7 @@
"ACCOUNTS_URL": "http://localhost:3000",
// "SERVER_PROVIDER":"uweb"
"SERVER_PROVIDER":"ws",
"MODEL_VERSION": "",
"MODEL_VERSION": "v0.6.287",
"ELASTIC_INDEX_NAME": "local_storage_index",
"UPLOAD_URL": "/files",
@ -75,17 +75,47 @@
"TRANSACTOR_URL": "ws://localhost:3333",
"ACCOUNT_PORT": "3000",
"FRONT_URL": "http://localhost:8080",
"outputCapture": "std",
"SES_URL": "",
"MINIO_ACCESS_KEY": "minioadmin",
"MINIO_SECRET_KEY": "minioadmin",
"MINIO_ENDPOINT": "localhost"
// "INIT_SCRIPT_URL": "https://raw.githubusercontent.com/hcengineering/init/main/script.yaml",
// "INIT_WORKSPACE": "onboarding",
},
"runtimeVersion": "20",
"runtimeArgs": ["--nolazy", "-r", "ts-node/register"],
"sourceMaps": true,
"outputCapture": "std",
"cwd": "${workspaceRoot}/pods/account",
"protocol": "inspector"
},
{
"name": "Debug Workspace",
"type": "node",
"request": "launch",
"args": ["src/__start.ts"],
"env": {
"MONGO_URL": "mongodb://localhost:27017",
"SERVER_SECRET": "secret",
"TRANSACTOR_URL": "ws://localhost:3333",
"ACCOUNTS_URL": "http://localhost:3000",
"FRONT_URL": "http://localhost:8080",
"SES_URL": "",
"MINIO_ACCESS_KEY": "minioadmin",
"MINIO_SECRET_KEY": "minioadmin",
"MINIO_ENDPOINT": "localhost",
"MODEL_VERSION": "v0.6.286",
// "INIT_SCRIPT_URL": "https://raw.githubusercontent.com/hcengineering/init/main/script.yaml",
// "INIT_WORKSPACE": "onboarding",
"NOTIFY_INBOX_ONLY": "true"
},
"runtimeVersion": "20",
"runtimeArgs": ["--nolazy", "-r", "ts-node/register"],
"sourceMaps": true,
"outputCapture": "std",
"cwd": "${workspaceRoot}/pods/workspace",
"protocol": "inspector"
},
{
"name": "Debug Front",
"type": "node",

View File

@ -626,6 +626,9 @@ dependencies:
'@rush-temp/pod-telegram-bot':
specifier: file:./projects/pod-telegram-bot.tgz
version: file:projects/pod-telegram-bot.tgz(bufferutil@4.0.8)(utf-8-validate@6.0.4)
'@rush-temp/pod-workspace':
specifier: file:./projects/pod-workspace.tgz
version: file:projects/pod-workspace.tgz
'@rush-temp/preference':
specifier: file:./projects/preference.tgz
version: file:projects/preference.tgz(@types/node@20.11.19)(esbuild@0.20.1)(ts-node@10.9.2)
@ -1064,6 +1067,9 @@ dependencies:
'@rush-temp/workbench-resources':
specifier: file:./projects/workbench-resources.tgz
version: file:projects/workbench-resources.tgz(@types/node@20.11.19)(esbuild@0.20.1)(postcss-load-config@4.0.2)(postcss@8.4.35)(ts-node@10.9.2)
'@rush-temp/workspace-service':
specifier: file:./projects/workspace-service.tgz
version: file:projects/workspace-service.tgz
'@sentry/node':
specifier: ^7.103.0
version: 7.118.0
@ -30132,6 +30138,46 @@ packages:
- utf-8-validate
dev: false
file:projects/pod-workspace.tgz:
resolution: {integrity: sha512-3hJxKzpd+0/zfr++zNy57willy+3axei4d1ehwmhxivlQh0/iKsV2RvKCMD7q74b/grO+gewyFY7XsVzBlkvrg==, tarball: file:projects/pod-workspace.tgz}
name: '@rush-temp/pod-workspace'
version: 0.0.0
dependencies:
'@types/jest': 29.5.12
'@types/node': 20.11.19
'@typescript-eslint/eslint-plugin': 6.21.0(@typescript-eslint/parser@6.21.0)(eslint@8.56.0)(typescript@5.3.3)
'@typescript-eslint/parser': 6.21.0(eslint@8.56.0)(typescript@5.3.3)
cross-env: 7.0.3
esbuild: 0.20.1
eslint: 8.56.0
eslint-config-standard-with-typescript: 40.0.0(@typescript-eslint/eslint-plugin@6.21.0)(eslint-plugin-import@2.29.1)(eslint-plugin-n@15.7.0)(eslint-plugin-promise@6.1.1)(eslint@8.56.0)(typescript@5.3.3)
eslint-plugin-import: 2.29.1(eslint@8.56.0)
eslint-plugin-n: 15.7.0(eslint@8.56.0)
eslint-plugin-promise: 6.1.1(eslint@8.56.0)
jest: 29.7.0(@types/node@20.11.19)(ts-node@10.9.2)
mongodb: 6.8.0
prettier: 3.2.5
ts-jest: 29.1.2(esbuild@0.20.1)(jest@29.7.0)(typescript@5.3.3)
ts-node: 10.9.2(@types/node@20.11.19)(typescript@5.3.3)
typescript: 5.3.3
transitivePeerDependencies:
- '@aws-sdk/credential-providers'
- '@babel/core'
- '@jest/types'
- '@mongodb-js/zstd'
- '@swc/core'
- '@swc/wasm'
- babel-jest
- babel-plugin-macros
- gcp-metadata
- kerberos
- mongodb-client-encryption
- node-notifier
- snappy
- socks
- supports-color
dev: false
file:projects/preference-assets.tgz(esbuild@0.20.1)(ts-node@10.9.2):
resolution: {integrity: sha512-VlBSKBg3XmuMLtxNAS703aS+dhhb5a7H5Ns2nzhhv7w3KlAqtwp6cQ5VLxceNuRaPbTtI+2K+YkjFb2S1ld5VQ==, tarball: file:projects/preference-assets.tgz}
id: file:projects/preference-assets.tgz
@ -35221,3 +35267,44 @@ packages:
- supports-color
- ts-node
dev: false
file:projects/workspace-service.tgz:
resolution: {integrity: sha512-WUCtvidfvVcahSFbmbtTZeGvedNNsG4RERSfnG+MWuDnDfyFAYnpBVest9gyO2/jH4cZ/AxeE1tgZKWPqCpSeg==, tarball: file:projects/workspace-service.tgz}
name: '@rush-temp/workspace-service'
version: 0.0.0
dependencies:
'@koa/cors': 5.0.0
'@types/jest': 29.5.12
'@types/koa': 2.15.0
'@types/koa-bodyparser': 4.3.12
'@types/koa-router': 7.4.8
'@types/koa__cors': 5.0.0
'@types/node': 20.11.19
'@typescript-eslint/eslint-plugin': 6.21.0(@typescript-eslint/parser@6.21.0)(eslint@8.56.0)(typescript@5.3.3)
'@typescript-eslint/parser': 6.21.0(eslint@8.56.0)(typescript@5.3.3)
cross-env: 7.0.3
esbuild: 0.20.1
eslint: 8.56.0
eslint-config-standard-with-typescript: 40.0.0(@typescript-eslint/eslint-plugin@6.21.0)(eslint-plugin-import@2.29.1)(eslint-plugin-n@15.7.0)(eslint-plugin-promise@6.1.1)(eslint@8.56.0)(typescript@5.3.3)
eslint-plugin-import: 2.29.1(eslint@8.56.0)
eslint-plugin-n: 15.7.0(eslint@8.56.0)
eslint-plugin-promise: 6.1.1(eslint@8.56.0)
jest: 29.7.0(@types/node@20.11.19)(ts-node@10.9.2)
koa: 2.15.3
koa-bodyparser: 4.4.1
koa-router: 12.0.1
mongodb: 6.8.0
prettier: 3.2.5
ts-jest: 29.1.2(esbuild@0.20.1)(jest@29.7.0)(typescript@5.3.3)
ts-node: 10.9.2(@types/node@20.11.19)(typescript@5.3.3)
typescript: 5.3.3
transitivePeerDependencies:
- '@babel/core'
- '@jest/types'
- '@swc/core'
- '@swc/wasm'
- babel-jest
- babel-plugin-macros
- node-notifier
- supports-color
dev: false

View File

@ -76,6 +76,32 @@ services:
# - INIT_SCRIPT_URL=https://raw.githubusercontent.com/hcengineering/init/main/script.yaml
# - INIT_WORKSPACE=onboarding
restart: unless-stopped
workspace:
image: hardcoreeng/workspace
# deploy:
# mode: replicated
# replicas: 3
links:
- mongodb
- minio
volumes:
- ./branding.json:/var/cfg/branding.json
environment:
# - WS_OPERATION=create
- SERVER_SECRET=secret
- MONGO_URL=mongodb://mongodb:27017?compressors=snappy
- TRANSACTOR_URL=ws://transactor:3333;ws://localhost:3333
- SES_URL=
- STORAGE_CONFIG=${STORAGE_CONFIG}
- FRONT_URL=http://localhost:8087
- RESERVED_DB_NAMES=telegram,gmail,github
- MODEL_ENABLED=*
- ACCOUNTS_URL=http://account:3000
- BRANDING_PATH=/var/cfg/branding.json
- NOTIFY_INBOX_ONLY=true
# - INIT_SCRIPT_URL=https://raw.githubusercontent.com/hcengineering/init/main/script.yaml
# - INIT_WORKSPACE=onboarding
restart: unless-stopped
collaborator:
image: hardcoreeng/collaborator
extra_hosts:

View File

@ -55,6 +55,7 @@
"dependencies": {
"@elastic/elasticsearch": "^7.14.0",
"@hcengineering/account": "^0.6.0",
"@hcengineering/workspace-service": "^0.6.0",
"@hcengineering/attachment": "^0.6.14",
"@hcengineering/calendar": "^0.6.24",
"@hcengineering/chunter": "^0.6.20",

View File

@ -19,7 +19,6 @@ import accountPlugin, {
assignWorkspace,
confirmEmail,
createAcc,
createWorkspace,
dropAccount,
dropWorkspace,
dropWorkspaceFull,
@ -32,10 +31,11 @@ import accountPlugin, {
replacePassword,
setAccountAdmin,
setRole,
UpgradeWorker,
upgradeWorkspace,
updateWorkspace,
createWorkspace as createWorkspaceRecord,
type Workspace
} from '@hcengineering/account'
import { createWorkspace, upgradeWorkspace } from '@hcengineering/workspace-service'
import { setMetadata } from '@hcengineering/platform'
import {
backup,
@ -48,7 +48,8 @@ import {
} from '@hcengineering/server-backup'
import serverClientPlugin, { BlobClient, createClient, getTransactorEndpoint } from '@hcengineering/server-client'
import serverToken, { decodeToken, generateToken } from '@hcengineering/server-token'
import toolPlugin from '@hcengineering/server-tool'
import toolPlugin, { FileModelLogger } from '@hcengineering/server-tool'
import path from 'path'
import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage'
import { program, type Command } from 'commander'
@ -327,19 +328,28 @@ export function devTool (
.action(async (workspace, cmd: { email: string, workspaceName: string, init?: string, branding?: string }) => {
const { mongodbUri, txes, version, migrateOperations } = prepareTools()
await withDatabase(mongodbUri, async (db) => {
await createWorkspace(
toolCtx,
version,
txes,
migrateOperations,
db,
cmd.init !== undefined || cmd.branding !== undefined
? { initWorkspace: cmd.init, key: cmd.branding ?? 'huly' }
: null,
cmd.email,
cmd.workspaceName,
workspace
)
const measureCtx = new MeasureMetricsContext('create-workspace', {})
const brandingObj =
cmd.branding !== undefined || cmd.init !== undefined ? { key: cmd.branding, initWorkspace: cmd.init } : null
const wsInfo = await createWorkspaceRecord(measureCtx, db, brandingObj, cmd.email, cmd.workspaceName, workspace)
// update the record so it's not taken by one of the workers for the next 60 seconds
await updateWorkspace(db, wsInfo, {
mode: 'creating',
progress: 0,
lastProcessingTime: Date.now() + 1000 * 60
})
await createWorkspace(measureCtx, version, brandingObj, wsInfo, txes, migrateOperations)
await updateWorkspace(db, wsInfo, {
mode: 'active',
progress: 100,
disabled: false,
version
})
console.log('create-workspace done')
})
})
@ -387,30 +397,36 @@ export function devTool (
throw new Error(`workspace ${workspace} not found`)
}
const measureCtx = new MeasureMetricsContext('upgrade', {})
const measureCtx = new MeasureMetricsContext('upgrade-workspace', {})
await upgradeWorkspace(
measureCtx,
version,
txes,
migrateOperations,
db,
info.workspaceUrl ?? info.workspace,
info,
consoleModelLogger,
async () => {},
cmd.force,
cmd.indexes
cmd.indexes,
true
)
console.log(metricsToString(measureCtx.metrics, 'upgrade', 60), {})
console.log('upgrade done')
await updateWorkspace(db, info, {
mode: 'active',
progress: 100,
version
})
console.log(metricsToString(measureCtx.metrics, 'upgrade', 60))
console.log('upgrade-workspace done')
})
})
program
.command('upgrade')
.description('upgrade')
.option('-p|--parallel <parallel>', 'Parallel upgrade', '0')
.option('-l|--logs <logs>', 'Default logs folder', './logs')
.option('-r|--retry <retry>', 'Number of apply retries', '0')
.option('-i|--ignore [ignore]', 'Ignore workspaces', '')
.option(
'-c|--console',
@ -418,29 +434,45 @@ export function devTool (
false
)
.option('-f|--force [force]', 'Force update', false)
.action(
async (cmd: {
parallel: string
logs: string
retry: string
force: boolean
console: boolean
ignore: string
}) => {
const { mongodbUri, version, txes, migrateOperations } = prepareTools()
await withDatabase(mongodbUri, async (db, client) => {
const worker = new UpgradeWorker(db, client, version, txes, migrateOperations)
await worker.upgradeAll(toolCtx, {
errorHandler: async (ws, err) => {},
force: cmd.force,
console: cmd.console,
logs: cmd.logs,
parallel: parseInt(cmd.parallel ?? '1'),
ignore: cmd.ignore
})
})
}
)
.action(async (cmd: { logs: string, force: boolean, console: boolean, ignore: string }) => {
const { mongodbUri, version, txes, migrateOperations } = prepareTools()
await withDatabase(mongodbUri, async (db, client) => {
const workspaces = (await listWorkspacesRaw(db)).filter((ws) => !cmd.ignore.includes(ws.workspace))
workspaces.sort((a, b) => b.lastVisit - a.lastVisit)
const measureCtx = new MeasureMetricsContext('upgrade', {})
for (const ws of workspaces) {
try {
const logger = cmd.console
? consoleModelLogger
: new FileModelLogger(path.join(cmd.logs, `${ws.workspace}.log`))
await upgradeWorkspace(
measureCtx,
version,
txes,
migrateOperations,
ws,
logger,
async () => {},
cmd.force,
false,
true
)
await updateWorkspace(db, ws, {
mode: 'active',
progress: 100,
version
})
} catch (err: any) {
console.error(err)
}
}
console.log('upgrade done')
})
})
program
.command('list-unused-workspaces')

View File

@ -652,6 +652,8 @@ export interface DomainIndexConfiguration extends Doc {
skip?: string[]
}
export type WorkspaceMode = 'pending-creation' | 'creating' | 'upgrading' | 'deleting' | 'active'
export interface BaseWorkspaceInfo {
workspace: string // An uniq workspace name, Database names
disabled?: boolean
@ -665,8 +667,8 @@ export interface BaseWorkspaceInfo {
createdBy: string
creating?: boolean
createProgress?: number // Some progress
mode: WorkspaceMode
progress?: number // Some progress
endpoint: string
}

View File

@ -47,6 +47,7 @@ import { TxOperations } from './operations'
import { isPredicate } from './predicate'
import { DocumentQuery, FindResult } from './storage'
import { DOMAIN_TX } from './tx'
import { Branding, BrandingMap } from './server'
function toHex (value: number, chars: number): string {
const result = value.toString(16)
@ -807,3 +808,9 @@ export function isOwnerOrMaintainer (): boolean {
export function hasAccountRole (acc: Account, targerRole: AccountRole): boolean {
return roleOrder[acc.role] >= roleOrder[targerRole]
}
export function getBranding (brandings: BrandingMap, key: string | undefined): Branding | null {
if (key === undefined) return null
return Object.values(brandings).find((branding) => branding.key === key) ?? null
}

View File

@ -125,8 +125,8 @@
<div class="flex flex-col flex-grow">
<span class="label overflow-label flex-center">
{wsName}
{#if workspace.creating === true}
({workspace.createProgress}%)
{#if workspace.mode === 'creating'}
({workspace.progress}%)
{/if}
</span>
{#if isAdmin && wsName !== workspace.workspace}

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
import { AccountRole, Doc, Ref, Timestamp } from '@hcengineering/core'
import { AccountRole, Doc, Ref, Timestamp, WorkspaceMode } from '@hcengineering/core'
import type { Asset, IntlString, Metadata, Plugin, Resource, Status } from '@hcengineering/platform'
import { plugin } from '@hcengineering/platform'
import type { AnyComponent } from '@hcengineering/ui'
@ -31,8 +31,8 @@ export interface Workspace {
workspaceName?: string // A company name
workspaceId: string // A unique identifier for the workspace
creating?: boolean
createProgress?: number
mode?: WorkspaceMode
progress?: number
lastVisit: number
}
@ -43,8 +43,8 @@ export interface Workspace {
export interface WorkspaceLoginInfo extends LoginInfo {
workspace: string
workspaceId: string
creating?: boolean
createProgress?: number
mode?: WorkspaceMode
progress?: number
}
/**

View File

@ -57,7 +57,7 @@
{#key $location.path[1]}
{#await connect(getMetadata(workbenchRes.metadata.PlatformTitle) ?? 'Platform')}
<Loading>
{#if ($workspaceCreating ?? -1) > 0}
{#if ($workspaceCreating ?? -1) >= 0}
<div class="ml-1">
<Label label={workbenchRes.string.WorkspaceCreating} />
{$workspaceCreating} %

View File

@ -83,13 +83,13 @@ export async function connect (title: string): Promise<Client | undefined> {
setMetadata(presentation.metadata.Token, token)
if (workspaceLoginInfo?.creating === true) {
if (['pending-creation', 'creating'].includes(workspaceLoginInfo?.mode ?? '')) {
const fetchWorkspace = await getResource(login.function.FetchWorkspace)
let loginInfo = await ctx.with('fetch-workspace', {}, async () => (await fetchWorkspace(ws))[1])
if (loginInfo?.creating === true) {
if (['pending-creation', 'creating'].includes(loginInfo?.mode ?? '')) {
while (true) {
if (ws !== getCurrentLocation().path[1]) return
workspaceCreating.set(loginInfo?.createProgress ?? 0)
workspaceCreating.set(loginInfo?.progress ?? 0)
loginInfo = await ctx.with('fetch-workspace', {}, async () => (await fetchWorkspace(ws))[1])
if (loginInfo === undefined) {
// something went wrong, workspace not exist, redirect to login
@ -98,8 +98,8 @@ export async function connect (title: string): Promise<Client | undefined> {
})
return
}
workspaceCreating.set(loginInfo?.createProgress)
if (loginInfo?.creating === false) {
workspaceCreating.set(loginInfo?.progress)
if (!['pending-creation', 'creating'].includes(loginInfo?.mode ?? '')) {
workspaceCreating.set(-1)
break
}

View File

@ -2,19 +2,13 @@
// Copyright © 2023 Hardcore Engineering Inc.
//
import { Analytics } from '@hcengineering/analytics'
import { MeasureMetricsContext, metricsToString, newMetrics, type Tx } from '@hcengineering/core'
import { MeasureMetricsContext, metricsToString, newMetrics } from '@hcengineering/core'
import { loadBrandingMap } from '@hcengineering/server-core'
import { configureAnalytics, SplitLogger } from '@hcengineering/analytics-service'
import builder, { getModelVersion, migrateOperations } from '@hcengineering/model-all'
import { writeFile } from 'fs/promises'
import { join } from 'path'
import { serveAccount } from '@hcengineering/account-service'
const enabled = (process.env.MODEL_ENABLED ?? '*').split(',').map((it) => it.trim())
const disabled = (process.env.MODEL_DISABLED ?? '').split(',').map((it) => it.trim())
const txes = JSON.parse(JSON.stringify(builder(enabled, disabled).getTxes())) as Tx[]
configureAnalytics(process.env.SENTRY_DSN, {})
Analytics.setTag('application', 'account')
@ -43,6 +37,6 @@ const intTimer = setInterval(() => {
const brandingPath = process.env.BRANDING_PATH
serveAccount(metricsContext, getModelVersion(), txes, migrateOperations, loadBrandingMap(brandingPath), () => {
serveAccount(metricsContext, loadBrandingMap(brandingPath), () => {
clearInterval(intTimer)
})

View File

@ -1,11 +1,11 @@
import { joinWithProvider, loginWithProvider, type LoginInfo } from '@hcengineering/account'
import { BrandingMap, concatLink, MeasureContext } from '@hcengineering/core'
import { BrandingMap, concatLink, MeasureContext, getBranding } from '@hcengineering/core'
import Router from 'koa-router'
import { Db } from 'mongodb'
import { Strategy as GitHubStrategy } from 'passport-github2'
import qs from 'querystringify'
import { Passport } from '.'
import { getBranding, getHost, safeParseAuthState } from './utils'
import { getHost, safeParseAuthState } from './utils'
export function registerGithub (
measureCtx: MeasureContext,

View File

@ -1,11 +1,11 @@
import { joinWithProvider, LoginInfo, loginWithProvider } from '@hcengineering/account'
import { BrandingMap, concatLink, MeasureContext } from '@hcengineering/core'
import { BrandingMap, concatLink, MeasureContext, getBranding } from '@hcengineering/core'
import Router from 'koa-router'
import { Db } from 'mongodb'
import { Strategy as GoogleStrategy } from 'passport-google-oauth20'
import qs from 'querystringify'
import { Passport } from '.'
import { getBranding, getHost, safeParseAuthState } from './utils'
import { getHost, safeParseAuthState } from './utils'
export function registerGoogle (
measureCtx: MeasureContext,

View File

@ -1,11 +1,11 @@
import { getAccountInfoByToken } from '@hcengineering/account'
import { BrandingMap, concatLink, MeasureContext } from '@hcengineering/core'
import { BrandingMap, concatLink, MeasureContext, getBranding } from '@hcengineering/core'
import Router from 'koa-router'
import { Db } from 'mongodb'
import qs from 'querystringify'
import { Strategy as CustomStrategy } from 'passport-custom'
import { Passport } from '.'
import { getBranding, getHost, safeParseAuthState } from './utils'
import { getHost, safeParseAuthState } from './utils'
export function registerToken (
measureCtx: MeasureContext,

View File

@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { Branding, BrandingMap } from '@hcengineering/core'
import { IncomingHttpHeaders } from 'http'
export function getHost (headers: IncomingHttpHeaders): string | undefined {
@ -25,12 +24,6 @@ export function getHost (headers: IncomingHttpHeaders): string | undefined {
return host
}
export function getBranding (brandings: BrandingMap, key: string | undefined): Branding | null {
if (key === undefined) return null
return Object.values(brandings).find((branding) => branding.key === key) ?? null
}
export interface AuthState {
inviteId?: string
branding?: string

View File

@ -0,0 +1,7 @@
module.exports = {
extends: ['./node_modules/@hcengineering/platform-rig/profiles/node/eslint.config.json'],
parserOptions: {
tsconfigRootDir: __dirname,
project: './tsconfig.json'
}
}

View File

@ -0,0 +1,4 @@
*
!/lib/**
!CHANGELOG.md
/lib/**/__tests__/

15
pods/workspace/Dockerfile Normal file
View File

@ -0,0 +1,15 @@
FROM node:20
WORKDIR /usr/src/app
RUN npm install --ignore-scripts=false --verbose bufferutil utf-8-validate @mongodb-js/zstd snappy --unsafe-perm
RUN apt-get update
RUN apt-get install libjemalloc2
ENV LD_PRELOAD=libjemalloc.so.2
ENV MALLOC_CONF=dirty_decay_ms:1000,narenas:2,background_thread:true
COPY bundle/bundle.js ./
CMD [ "node", "bundle.js" ]

20
pods/workspace/build.sh Executable file
View File

@ -0,0 +1,20 @@
#!/usr/bin/env bash
#
# Copyright © 2020, 2021 Anticrm Platform Contributors.
# Copyright © 2021 Hardcore Engineering Inc.
#
# Licensed under the Eclipse Public License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. You may
# obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
#
rushx bundle
rushx docker:build
rushx docker:push

View File

@ -0,0 +1,5 @@
{
"$schema": "https://developer.microsoft.com/json-schemas/rig-package/rig.schema.json",
"rigPackageName": "@hcengineering/platform-rig",
"rigProfile": "node"
}

View File

@ -0,0 +1,7 @@
module.exports = {
preset: 'ts-jest',
testEnvironment: 'node',
testMatch: ['**/?(*.)+(spec|test).[jt]s?(x)'],
roots: ["./src"],
coverageReporters: ["text-summary", "html"]
}

View File

@ -0,0 +1,61 @@
{
"name": "@hcengineering/pod-workspace",
"version": "0.6.0",
"main": "lib/index.js",
"svelte": "src/index.ts",
"types": "types/index.d.ts",
"author": "Anticrm Platform Contributors",
"template": "@hcengineering/node-package",
"license": "EPL-2.0",
"scripts": {
"start": "ts-node src/__start.ts",
"build": "compile",
"build:watch": "compile",
"_phase:bundle": "rushx bundle",
"_phase:docker-build": "rushx docker:build",
"_phase:docker-staging": "rushx docker:staging",
"bundle": "mkdir -p bundle && esbuild src/__start.ts --sourcemap=inline --external:*.node --external:snappy --bundle --define:process.env.MODEL_VERSION=$(node ../../common/scripts/show_version.js) --minify --platform=node > bundle/bundle.js",
"docker:build": "../../common/scripts/docker_build.sh hardcoreeng/workspace",
"docker:tbuild": "docker build -t hardcoreeng/workspace . --platform=linux/amd64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/workspace",
"docker:abuild": "docker build -t hardcoreeng/workspace . --platform=linux/arm64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/workspace",
"docker:staging": "../../common/scripts/docker_tag.sh hardcoreeng/workspace staging",
"docker:push": "../../common/scripts/docker_tag.sh hardcoreeng/workspace",
"run-local": "cross-env MONGO_URL=mongodb://localhost:27017 MINIO_ACCESS_KEY=minioadmi MINIO_SECRET_KEY=minioadmin MINIO_ENDPOINT=localhost SERVER_SECRET='secret' TRANSACTOR_URL=ws://localhost:3333 ts-node src/__start.ts",
"format": "format src",
"test": "jest --passWithNoTests --silent --forceExit",
"_phase:build": "compile transpile src",
"_phase:test": "jest --passWithNoTests --silent --forceExit",
"_phase:format": "format src",
"_phase:validate": "compile validate"
},
"devDependencies": {
"cross-env": "~7.0.3",
"@hcengineering/platform-rig": "^0.6.0",
"@types/node": "~20.11.16",
"@typescript-eslint/eslint-plugin": "^6.11.0",
"eslint-plugin-import": "^2.26.0",
"eslint-plugin-promise": "^6.1.1",
"eslint-plugin-n": "^15.4.0",
"eslint": "^8.54.0",
"esbuild": "^0.20.0",
"@typescript-eslint/parser": "^6.11.0",
"eslint-config-standard-with-typescript": "^40.0.0",
"prettier": "^3.1.0",
"ts-node": "^10.8.0",
"typescript": "^5.3.3",
"jest": "^29.7.0",
"ts-jest": "^29.1.1",
"@types/jest": "^29.5.5"
},
"dependencies": {
"@hcengineering/workspace-service": "^0.6.0",
"@hcengineering/platform": "^0.6.11",
"@hcengineering/core": "^0.6.32",
"mongodb": "^6.8.0",
"@hcengineering/server-token": "^0.6.11",
"@hcengineering/server-core": "^0.6.1",
"@hcengineering/model-all": "^0.6.0",
"@hcengineering/analytics": "^0.6.0",
"@hcengineering/analytics-service": "^0.6.0"
}
}

View File

@ -0,0 +1,48 @@
//
// Copyright © 2023 Hardcore Engineering Inc.
//
import { Analytics } from '@hcengineering/analytics'
import { MeasureMetricsContext, metricsToString, newMetrics, type Tx } from '@hcengineering/core'
import { loadBrandingMap } from '@hcengineering/server-core'
import { configureAnalytics, SplitLogger } from '@hcengineering/analytics-service'
import builder, { getModelVersion, migrateOperations } from '@hcengineering/model-all'
import { writeFile } from 'fs/promises'
import { join } from 'path'
import { serveWorkspaceAccount } from '@hcengineering/workspace-service'
const enabled = (process.env.MODEL_ENABLED ?? '*').split(',').map((it) => it.trim())
const disabled = (process.env.MODEL_DISABLED ?? '').split(',').map((it) => it.trim())
const txes = JSON.parse(JSON.stringify(builder(enabled, disabled).getTxes())) as Tx[]
configureAnalytics(process.env.SENTRY_DSN, {})
Analytics.setTag('application', 'workspace')
const metricsContext = new MeasureMetricsContext(
'workspace',
{},
{},
newMetrics(),
new SplitLogger('workspace', {
root: join(process.cwd(), 'logs'),
enableConsole: (process.env.ENABLE_CONSOLE ?? 'true') === 'true'
})
)
let oldMetricsValue = ''
const intTimer = setInterval(() => {
const val = metricsToString(metricsContext.metrics, 'Workspace', 140)
if (val !== oldMetricsValue) {
oldMetricsValue = val
void writeFile('metrics.txt', val).catch((err) => {
console.error(err)
})
}
}, 30000)
const brandingPath = process.env.BRANDING_PATH
serveWorkspaceAccount(metricsContext, getModelVersion(), txes, migrateOperations, loadBrandingMap(brandingPath), () => {
clearInterval(intTimer)
})

View File

@ -0,0 +1,10 @@
{
"extends": "./node_modules/@hcengineering/platform-rig/profiles/node/tsconfig.json",
"compilerOptions": {
"rootDir": "./src",
"outDir": "./lib",
"declarationDir": "./types",
"tsBuildInfoFile": ".build/build.tsbuildinfo"
}
}

View File

@ -54,6 +54,23 @@ services:
- STORAGE_CONFIG=${STORAGE_CONFIG}
- MODEL_ENABLED=*
- BRANDING_PATH=/var/cfg/branding-test.json
workspace:
image: hardcoreeng/workspace
links:
- mongodb
- minio
volumes:
- ./branding-test.json:/var/cfg/branding.json
environment:
- SERVER_SECRET=secret
- MONGO_URL=mongodb://mongodb:27018
- TRANSACTOR_URL=ws://transactor:3334;ws://localhost:3334
- STORAGE_CONFIG=${STORAGE_CONFIG}
- MODEL_ENABLED=*
- ACCOUNTS_URL=http://account:3003
- BRANDING_PATH=/var/cfg/branding.json
- NOTIFY_INBOX_ONLY=true
restart: unless-stopped
front:
image: hardcoreeng/front
pull_policy: never

View File

@ -871,6 +871,11 @@
"projectFolder": "server/account-service",
"shouldPublish": false
},
{
"packageName": "@hcengineering/workspace-service",
"projectFolder": "server/workspace-service",
"shouldPublish": false
},
{
"packageName": "@hcengineering/collaborator",
"projectFolder": "server/collaborator",
@ -886,6 +891,11 @@
"projectFolder": "pods/account",
"shouldPublish": false
},
{
"packageName": "@hcengineering/pod-workspace",
"projectFolder": "pods/workspace",
"shouldPublish": false
},
{
"packageName": "@hcengineering/pod-collaborator",
"projectFolder": "pods/collaborator",

View File

@ -198,7 +198,11 @@ export async function isShouldNotifyTx (
const types = getMatchedTypes(control, tx, originTx, isOwn, isSpace, docUpdateMessage?.attributeUpdates?.attrKey)
const modifiedAccount = getPersonAccountById(tx.modifiedBy, control)
const result = new Map<Ref<NotificationProvider>, BaseNotificationType[]>()
const providers = control.modelDb.findAllSync(notification.class.NotificationProvider, {})
let providers: NotificationProvider[] = control.modelDb.findAllSync(notification.class.NotificationProvider, {})
if (process.env.NOTIFY_INBOX_ONLY === 'true') {
providers = providers.filter((it) => it._id === notification.providers.InboxNotificationProvider)
}
for (const type of types) {
if (

View File

@ -5,10 +5,8 @@
import account, {
ACCOUNT_DB,
EndpointKind,
UpgradeWorker,
accountId,
cleanExpiredOtp,
cleanInProgressWorkspaces,
getAllTransactors,
getMethods
} from '@hcengineering/account'
@ -16,15 +14,7 @@ import accountEn from '@hcengineering/account/lang/en.json'
import accountRu from '@hcengineering/account/lang/ru.json'
import { Analytics } from '@hcengineering/analytics'
import { registerProviders } from '@hcengineering/auth-providers'
import {
metricsAggregate,
type BrandingMap,
type Data,
type MeasureContext,
type Tx,
type Version
} from '@hcengineering/core'
import { type MigrateOperation } from '@hcengineering/model'
import { metricsAggregate, type BrandingMap, type MeasureContext } from '@hcengineering/core'
import { getMongoClient, type MongoClientReference } from '@hcengineering/mongo'
import platform, { Severity, Status, addStringsLoader, setMetadata } from '@hcengineering/platform'
import serverClientPlugin from '@hcengineering/server-client'
@ -41,16 +31,9 @@ import os from 'os'
/**
* @public
*/
export function serveAccount (
measureCtx: MeasureContext,
version: Data<Version>,
txes: Tx[],
migrateOperations: [string, MigrateOperation][],
brandings: BrandingMap,
onClose?: () => void
): void {
export function serveAccount (measureCtx: MeasureContext, brandings: BrandingMap, onClose?: () => void): void {
console.log('Starting account service with brandings: ', brandings)
const methods = getMethods(version, txes, migrateOperations)
const methods = getMethods()
const ACCOUNT_PORT = parseInt(process.env.ACCOUNT_PORT ?? '3000')
const dbUri = process.env.MONGO_URL
if (dbUri === undefined) {
@ -109,8 +92,6 @@ export function serveAccount (
const client: MongoClientReference = getMongoClient(dbUri)
let _client: MongoClient | Promise<MongoClient> = client.getClient()
let worker: UpgradeWorker | undefined
const app = new Koa()
const router = new Router()
@ -125,31 +106,12 @@ export function serveAccount (
const db = p.db(ACCOUNT_DB)
registerProviders(measureCtx, app, router, db, serverSecret, frontURL, brandings)
// We need to clean workspace with creating === true, since server is restarted.
void cleanInProgressWorkspaces(db)
setInterval(
() => {
void cleanExpiredOtp(db)
},
3 * 60 * 1000
)
const performUpgrade = (process.env.PERFORM_UPGRADE ?? 'true') === 'true'
if (performUpgrade) {
await measureCtx.with('upgrade-all-models', {}, async (ctx) => {
worker = new UpgradeWorker(db, p, version, txes, migrateOperations)
await worker.upgradeAll(ctx, {
errorHandler: async (ws, err) => {
Analytics.handleError(err)
},
force: false,
console: false,
logs: 'upgrade-logs',
parallel: parseInt(process.env.PARALLEL ?? '1')
})
})
}
})
const extractToken = (header: IncomingHttpHeaders): string | undefined => {
@ -255,7 +217,6 @@ export function serveAccount (
async (ctx) => await method(ctx, db, branding, request, token)
)
worker?.updateResponseStatistics(result)
ctx.body = result
})

View File

@ -14,7 +14,6 @@
// limitations under the License.
//
import builder, { migrateOperations, getModelVersion } from '@hcengineering/model-all'
import { randomBytes } from 'crypto'
import { Db, MongoClient } from 'mongodb'
import accountPlugin, { getAccount, getMethods, getWorkspaceByUrl } from '../operations'
@ -23,7 +22,7 @@ import { MeasureMetricsContext } from '@hcengineering/core'
const DB_NAME = 'test_accounts'
const methods = getMethods(getModelVersion(), builder().getTxes(), migrateOperations)
const methods = getMethods()
const metricsContext = new MeasureMetricsContext('account', {})

View File

@ -17,6 +17,4 @@ import { accountPlugin } from './plugin'
export * from './operations'
export * from './plugin'
export * from './service'
export default accountPlugin

File diff suppressed because it is too large Load Diff

View File

@ -1,197 +0,0 @@
//
// Copyright © 2024 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { BaseWorkspaceInfo, Data, RateLimiter, Tx, Version, type MeasureContext } from '@hcengineering/core'
import { MigrateOperation, ModelLogger } from '@hcengineering/model'
import { FileModelLogger } from '@hcengineering/server-tool'
import { Db, MongoClient } from 'mongodb'
import path from 'path'
import {
Workspace,
WorkspaceInfo,
listWorkspacesRaw,
updateWorkspace,
upgradeWorkspace,
clearWorkspaceProductId
} from './operations'
import { Analytics } from '@hcengineering/analytics'
export type UpgradeErrorHandler = (workspace: BaseWorkspaceInfo, error: any) => Promise<void>
export interface UpgradeOptions {
errorHandler: (workspace: BaseWorkspaceInfo, error: any) => Promise<void>
force: boolean
console: boolean
logs: string
parallel: number
ignore?: string
}
export class UpgradeWorker {
constructor (
readonly db: Db,
readonly client: MongoClient,
readonly version: Data<Version>,
readonly txes: Tx[],
readonly migrationOperation: [string, MigrateOperation][]
) {}
canceled = false
st: number = Date.now()
total: number = 0
toProcess: number = 0
eta: number = 0
updateResponseStatistics (response: any): void {
response.upgrade = {
toProcess: this.toProcess,
total: this.total,
elapsed: Date.now() - this.st,
eta: this.eta
}
}
async close (): Promise<void> {
this.canceled = true
}
private async _upgradeWorkspace (ctx: MeasureContext, ws: WorkspaceInfo, opt: UpgradeOptions): Promise<void> {
if (ws.disabled === true || (opt.ignore ?? '').includes(ws.workspace)) {
return
}
const t = Date.now()
const ctxModelLogger: ModelLogger = {
log (msg: string, data: any): void {
ctx.info(msg, data)
},
error (msg: string, data: any): void {
ctx.error(msg, data)
}
}
const logger = opt.console ? ctxModelLogger : new FileModelLogger(path.join(opt.logs, `${ws.workspace}.log`))
const avgTime = (Date.now() - this.st) / (this.total - this.toProcess + 1)
this.eta = Math.floor(avgTime * this.toProcess)
ctx.info('----------------------------------------------------------\n---UPGRADING----', {
pending: this.toProcess,
eta: this.eta,
workspace: ws.workspace
})
this.toProcess--
try {
const version = await upgradeWorkspace(
ctx,
this.version,
this.txes,
this.migrationOperation,
this.db,
ws.workspaceUrl ?? ws.workspace,
logger,
opt.force
)
ctx.info('---done---------', {
pending: this.toProcess,
time: Date.now() - t,
workspace: ws.workspace,
version
})
} catch (err: any) {
await opt.errorHandler(ws, err)
logger.log('error', err)
if (!opt.console) {
ctx.error('error', { err })
}
Analytics.handleError(err)
ctx.info('---failed---------', {
pending: this.toProcess,
time: Date.now() - t,
workspace: ws.workspace
})
} finally {
if (!opt.console) {
;(logger as FileModelLogger).close()
}
}
}
async upgradeAll (ctx: MeasureContext, opt: UpgradeOptions): Promise<void> {
const workspaces = await ctx.with('retrieve-workspaces', {}, async (ctx) => await listWorkspacesRaw(this.db))
workspaces.sort((a, b) => b.lastVisit - a.lastVisit)
for (const ws of workspaces) {
// We need to update workspaces with missing workspaceUrl
if (ws.workspaceUrl == null) {
const upd: Partial<Workspace> = {
workspaceUrl: ws.workspace
}
if (ws.workspaceName == null) {
upd.workspaceName = ws.workspace
}
await updateWorkspace(this.db, ws, upd)
}
// We need to drop productId from workspace
if ((ws as any).productId !== undefined) {
await clearWorkspaceProductId(this.db, ws)
}
}
const withError: string[] = []
this.toProcess = workspaces.length
this.st = Date.now()
this.total = workspaces.length
if (opt.parallel > 1) {
const parallel = opt.parallel
const rateLimit = new RateLimiter(parallel)
ctx.info('parallel upgrade', { parallel })
for (const it of workspaces) {
await rateLimit.add(async () => {
try {
await ctx.with('do-upgrade', {}, async (ctx) => {
await this._upgradeWorkspace(ctx, it, opt)
})
} catch (err: any) {
ctx.error('Failed to update', { err })
Analytics.handleError(err)
}
})
}
await rateLimit.waitProcessing()
ctx.info('Upgrade done')
} else {
ctx.info('UPGRADE write logs at:', { logs: opt.logs })
for (const ws of workspaces) {
try {
await this._upgradeWorkspace(ctx, ws, opt)
} catch (err: any) {
ctx.error('Failed to update', { err })
Analytics.handleError(err)
}
}
if (withError.length > 0) {
ctx.info('Failed workspaces', withError)
}
}
}
}

View File

@ -0,0 +1,157 @@
//
// Copyright © 2024 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { type BaseWorkspaceInfo, type Data, type Version } from '@hcengineering/core'
import { getMetadata, PlatformError, unknownError } from '@hcengineering/platform'
import plugin from './plugin'
export async function listAccountWorkspaces (token: string): Promise<BaseWorkspaceInfo[]> {
const accountsUrl = getMetadata(plugin.metadata.Endpoint)
if (accountsUrl == null) {
throw new PlatformError(unknownError('No account endpoint specified'))
}
const workspaces = await (
await fetch(accountsUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
method: 'listWorkspaces',
params: [token]
})
})
).json()
return (workspaces.result as BaseWorkspaceInfo[]) ?? []
}
export async function getTransactorEndpoint (
token: string,
kind: 'internal' | 'external' = 'internal',
timeout: number = -1
): Promise<string> {
const accountsUrl = getMetadata(plugin.metadata.Endpoint)
if (accountsUrl == null) {
throw new PlatformError(unknownError('No account endpoint specified'))
}
const st = Date.now()
while (true) {
try {
const workspaceInfo: { result: BaseWorkspaceInfo } = await (
await fetch(accountsUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: 'Bearer ' + token
},
body: JSON.stringify({
method: 'selectWorkspace',
params: ['', kind]
})
})
).json()
return workspaceInfo.result.endpoint
} catch (err: any) {
if (timeout > 0 && st + timeout < Date.now()) {
// Timeout happened
throw err
}
if (err?.cause?.code === 'ECONNRESET' || err?.cause?.code === 'ECONNREFUSED') {
await new Promise<void>((resolve) => setTimeout(resolve, 1000))
} else {
throw err
}
}
}
}
export async function getPendingWorkspace (
token: string,
region: string,
version: Data<Version>,
operation: 'create' | 'upgrade' | 'all'
): Promise<BaseWorkspaceInfo | undefined> {
const accountsUrl = getMetadata(plugin.metadata.Endpoint)
if (accountsUrl == null) {
throw new PlatformError(unknownError('No account endpoint specified'))
}
const workspaces = await (
await fetch(accountsUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
method: 'getPendingWorkspace',
params: [token, region, version, operation]
})
})
).json()
return workspaces.result as BaseWorkspaceInfo
}
export async function updateWorkspaceInfo (
token: string,
workspaceId: string,
event: 'ping' | 'create-started' | 'upgrade-started' | 'progress' | 'create-done' | 'upgrade-done',
version: Data<Version>,
progress: number,
message?: string
): Promise<void> {
const accountsUrl = getMetadata(plugin.metadata.Endpoint)
if (accountsUrl == null) {
throw new PlatformError(unknownError('No account endpoint specified'))
}
await (
await fetch(accountsUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
method: 'updateWorkspaceInfo',
params: [token, workspaceId, event, version, progress, message]
})
})
).json()
}
export async function workerHandshake (
token: string,
region: string,
version: Data<Version>,
operation: 'create' | 'upgrade' | 'all'
): Promise<void> {
const accountsUrl = getMetadata(plugin.metadata.Endpoint)
if (accountsUrl == null) {
throw new PlatformError(unknownError('No account endpoint specified'))
}
await fetch(accountsUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
method: 'workerHandshake',
params: [token, region, version, operation]
})
})
}

225
server/client/src/blob.ts Normal file
View File

@ -0,0 +1,225 @@
//
// Copyright © 2024 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { WorkspaceId, type MeasureContext } from '@hcengineering/core'
import type { StorageAdapter } from '@hcengineering/server-core'
import { Buffer } from 'node:buffer'
// Will use temporary file to store huge content into
export class BlobClient {
transactorAPIUrl: string
index: number
constructor (
readonly transactorUrl: string,
readonly token: string,
readonly workspace: WorkspaceId,
readonly opt?: {
storageAdapter?: StorageAdapter
}
) {
this.index = 0
let url = transactorUrl
if (url.endsWith('/')) {
url = url.slice(0, url.length - 1)
}
this.transactorAPIUrl = url.replaceAll('wss://', 'https://').replace('ws://', 'http://') + '/api/v1/blob'
}
async checkFile (ctx: MeasureContext, name: string): Promise<boolean> {
if (this.opt?.storageAdapter !== undefined) {
const obj = await this.opt?.storageAdapter.stat(ctx, this.workspace, name)
if (obj !== undefined) {
return true
}
}
for (let i = 0; i < 5; i++) {
try {
const response = await fetch(this.transactorAPIUrl + `?name=${encodeURIComponent(name)}`, {
headers: {
Authorization: 'Bearer ' + this.token,
Range: 'bytes=0-1'
}
})
if (response.status === 404) {
return false
}
const buff = await response.arrayBuffer()
return buff.byteLength > 0
} catch (err: any) {
if (i === 4) {
ctx.error('Failed to check file', { name, error: err })
}
await new Promise<void>((resolve) => setTimeout(resolve, 500))
}
}
return false
}
async writeTo (
ctx: MeasureContext,
name: string,
size: number,
writable: {
write: (buffer: Buffer, cb: (err?: any) => void) => void
end: (cb: () => void) => void
}
): Promise<void> {
let written = 0
const chunkSize = 50 * 1024 * 1024
let writtenMb = 0
// Use ranges to iterave through file with retry if required.
while (written < size || size === -1) {
let i = 0
let response: Response | undefined
for (; i < 5; i++) {
try {
const st = Date.now()
let chunk: Buffer
if (this.opt?.storageAdapter !== undefined) {
const chunks: Buffer[] = []
const readable = await this.opt.storageAdapter.partial(ctx, this.workspace, name, written, chunkSize)
await new Promise<void>((resolve) => {
readable.on('data', (chunk) => {
chunks.push(chunk)
})
readable.on('end', () => {
resolve()
})
})
chunk = Buffer.concat(chunks)
} else {
const header: Record<string, string> = {
Authorization: 'Bearer ' + this.token
}
if (!(size !== -1 && written === 0 && size < chunkSize)) {
header.Range = `bytes=${written}-${size === -1 ? written + chunkSize : Math.min(size - 1, written + chunkSize)}`
}
response = await fetch(this.transactorAPIUrl + `?name=${encodeURIComponent(name)}`, { headers: header })
if (header.Range != null) {
ctx.info('fetch part', { time: Date.now() - st, blobId: name, written, size })
}
if (response.status === 403) {
i = 5
// No file, so make it empty
throw new Error(`Unauthorized ${this.transactorAPIUrl}/${this.workspace.name}/${name}`)
}
if (response.status === 404) {
i = 5
// No file, so make it empty
throw new Error(`No file for ${this.transactorAPIUrl}/${this.workspace.name}/${name}`)
}
if (response.status === 416) {
if (size === -1) {
size = parseInt((response.headers.get('content-range') ?? '').split('*/')[1])
continue
}
// No file, so make it empty
throw new Error(`No file for ${this.transactorAPIUrl}/${this.workspace.name}/${name}`)
}
chunk = Buffer.from(await response.arrayBuffer())
if (header.Range == null) {
size = chunk.length
}
// We need to parse
// 'Content-Range': `bytes ${start}-${end}/${size}`
// To determine if something is left
const range = response.headers.get('Content-Range')
if (range !== null) {
const [, total] = range.split(' ')[1].split('/')
if (total !== undefined) {
size = parseInt(total)
}
}
}
await new Promise<void>((resolve, reject) => {
writable.write(chunk, (err) => {
if (err != null) {
reject(err)
}
resolve()
})
})
written += chunk.length
const newWrittenMb = Math.round(written / (1024 * 1024))
const newWrittenId = Math.round(newWrittenMb / 5)
if (writtenMb !== newWrittenId) {
writtenMb = newWrittenId
ctx.info(' >>>>Chunk', {
name,
written: newWrittenMb,
of: Math.round(size / (1024 * 1024))
})
}
break
} catch (err: any) {
if (err?.code === 'NoSuchKey') {
ctx.info('No such key', { name })
return
}
if (i > 4) {
await new Promise<void>((resolve) => {
writable.end(resolve)
})
throw err
}
await new Promise<void>((resolve) => setTimeout(resolve, 1000))
// retry
}
}
}
await new Promise<void>((resolve) => {
writable.end(resolve)
})
}
async upload (ctx: MeasureContext, name: string, size: number, contentType: string, buffer: Buffer): Promise<void> {
if (this.opt?.storageAdapter !== undefined) {
await this.opt.storageAdapter.put(ctx, this.workspace, name, buffer, contentType, size)
} else {
// TODO: We need to improve this logig, to allow restore of huge blobs
for (let i = 0; i < 5; i++) {
try {
await fetch(
this.transactorAPIUrl +
`?name=${encodeURIComponent(name)}&contentType=${encodeURIComponent(contentType)}&size=${size}`,
{
method: 'PUT',
headers: {
Authorization: 'Bearer ' + this.token,
'Content-Type': contentType
},
body: buffer
}
)
break
} catch (err: any) {
if (i === 4) {
ctx.error('failed to upload file', { name })
throw err
}
}
}
}
}
}

View File

@ -0,0 +1,72 @@
//
// Copyright © 2024 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import client, { clientId } from '@hcengineering/client'
import { Client, LoadModelResponse, Tx } from '@hcengineering/core'
import { addLocation, getMetadata, getResource, setMetadata } from '@hcengineering/platform'
import crypto from 'node:crypto'
import plugin from './plugin'
/**
* @public
*
* If connectTimeout is set, connect will try to connect only specified amount of time, and will return failure if failed.
*/
export async function createClient (
transactorUrl: string,
token: string,
model?: Tx[],
connectTimeout: number = 0
): Promise<Client> {
// We need to override default factory with 'ws' one.
// eslint-disable-next-line
const WebSocket = require('ws')
setMetadata(client.metadata.UseBinaryProtocol, true)
setMetadata(client.metadata.UseProtocolCompression, true)
setMetadata(client.metadata.ConnectionTimeout, connectTimeout)
setMetadata(client.metadata.ClientSocketFactory, (url) => {
const socket = new WebSocket(url, {
headers: {
'User-Agent': getMetadata(plugin.metadata.UserAgent) ?? 'Anticrm Client'
}
})
return socket
})
addLocation(clientId, () => import('@hcengineering/client-resources'))
if (model !== undefined) {
let prev = ''
const hashes = model.map((it) => {
const h = crypto.createHash('sha1')
h.update(prev)
h.update(JSON.stringify(it))
prev = h.digest('hex')
return prev
})
setMetadata(client.metadata.OverridePersistenceStore, {
load: async () => ({
hash: hashes[hashes.length - 1],
transactions: model,
full: true
}),
store: async (model: LoadModelResponse) => {}
})
}
const clientFactory = await getResource(client.function.GetClient)
return await clientFactory(token, transactorUrl)
}

View File

@ -14,347 +14,10 @@
// limitations under the License.
//
import client, { clientId } from '@hcengineering/client'
import {
Client,
LoadModelResponse,
Tx,
WorkspaceId,
type BaseWorkspaceInfo,
type MeasureContext
} from '@hcengineering/core'
import {
addLocation,
getMetadata,
getResource,
PlatformError,
setMetadata,
unknownError
} from '@hcengineering/platform'
import type { StorageAdapter } from '@hcengineering/server-core'
import { Buffer } from 'node:buffer'
import crypto from 'node:crypto'
import plugin from './plugin'
export default plugin
export async function listAccountWorkspaces (token: string): Promise<BaseWorkspaceInfo[]> {
const accountsUrl = getMetadata(plugin.metadata.Endpoint)
if (accountsUrl == null) {
throw new PlatformError(unknownError('No endpoint specified'))
}
const workspaces = await (
await fetch(accountsUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
method: 'listWorkspaces',
params: [token]
})
})
).json()
return (workspaces.result as BaseWorkspaceInfo[]) ?? []
}
export async function getTransactorEndpoint (
token: string,
kind: 'internal' | 'external' = 'internal',
timeout: number = -1
): Promise<string> {
const accountsUrl = getMetadata(plugin.metadata.Endpoint)
if (accountsUrl == null) {
throw new PlatformError(unknownError('No endpoint specified'))
}
const st = Date.now()
while (true) {
try {
const workspaceInfo: { result: BaseWorkspaceInfo } = await (
await fetch(accountsUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: 'Bearer ' + token
},
body: JSON.stringify({
method: 'selectWorkspace',
params: ['', kind]
})
})
).json()
return workspaceInfo.result.endpoint
} catch (err: any) {
if (timeout > 0 && st + timeout < Date.now()) {
// Timeout happened
throw err
}
if (err?.cause?.code === 'ECONNRESET' || err?.cause?.code === 'ECONNREFUSED') {
await new Promise<void>((resolve) => setTimeout(resolve, 1000))
} else {
throw err
}
}
}
}
/**
* @public
*
* If connectTimeout is set, connect will try to connect only specified amount of time, and will return failure if failed.
*/
export async function createClient (
transactorUrl: string,
token: string,
model?: Tx[],
connectTimeout: number = 0
): Promise<Client> {
// We need to override default factory with 'ws' one.
// eslint-disable-next-line
const WebSocket = require('ws')
setMetadata(client.metadata.UseBinaryProtocol, true)
setMetadata(client.metadata.UseProtocolCompression, true)
setMetadata(client.metadata.ConnectionTimeout, connectTimeout)
setMetadata(client.metadata.ClientSocketFactory, (url) => {
const socket = new WebSocket(url, {
headers: {
'User-Agent': getMetadata(plugin.metadata.UserAgent) ?? 'Anticrm Client'
}
})
return socket
})
addLocation(clientId, () => import('@hcengineering/client-resources'))
if (model !== undefined) {
let prev = ''
const hashes = model.map((it) => {
const h = crypto.createHash('sha1')
h.update(prev)
h.update(JSON.stringify(it))
prev = h.digest('hex')
return prev
})
setMetadata(client.metadata.OverridePersistenceStore, {
load: async () => ({
hash: hashes[hashes.length - 1],
transactions: model,
full: true
}),
store: async (model: LoadModelResponse) => {}
})
}
const clientFactory = await getResource(client.function.GetClient)
return await clientFactory(token, transactorUrl)
}
// Will use temporary file to store huge content into
export class BlobClient {
transactorAPIUrl: string
index: number
constructor (
readonly transactorUrl: string,
readonly token: string,
readonly workspace: WorkspaceId,
readonly opt?: {
storageAdapter?: StorageAdapter
}
) {
this.index = 0
let url = transactorUrl
if (url.endsWith('/')) {
url = url.slice(0, url.length - 1)
}
this.transactorAPIUrl = url.replaceAll('wss://', 'https://').replace('ws://', 'http://') + '/api/v1/blob'
}
async checkFile (ctx: MeasureContext, name: string): Promise<boolean> {
if (this.opt?.storageAdapter !== undefined) {
const obj = await this.opt?.storageAdapter.stat(ctx, this.workspace, name)
if (obj !== undefined) {
return true
}
}
for (let i = 0; i < 5; i++) {
try {
const response = await fetch(this.transactorAPIUrl + `?name=${encodeURIComponent(name)}`, {
headers: {
Authorization: 'Bearer ' + this.token,
Range: 'bytes=0-1'
}
})
if (response.status === 404) {
return false
}
const buff = await response.arrayBuffer()
return buff.byteLength > 0
} catch (err: any) {
if (i === 4) {
ctx.error('Failed to check file', { name, error: err })
}
await new Promise<void>((resolve) => setTimeout(resolve, 500))
}
}
return false
}
async writeTo (
ctx: MeasureContext,
name: string,
size: number,
writable: {
write: (buffer: Buffer, cb: (err?: any) => void) => void
end: (cb: () => void) => void
}
): Promise<void> {
let written = 0
const chunkSize = 50 * 1024 * 1024
let writtenMb = 0
// Use ranges to iterave through file with retry if required.
while (written < size || size === -1) {
let i = 0
let response: Response | undefined
for (; i < 5; i++) {
try {
const st = Date.now()
let chunk: Buffer
if (this.opt?.storageAdapter !== undefined) {
const chunks: Buffer[] = []
const readable = await this.opt.storageAdapter.partial(ctx, this.workspace, name, written, chunkSize)
await new Promise<void>((resolve) => {
readable.on('data', (chunk) => {
chunks.push(chunk)
})
readable.on('end', () => {
resolve()
})
})
chunk = Buffer.concat(chunks)
} else {
const header: Record<string, string> = {
Authorization: 'Bearer ' + this.token
}
if (!(size !== -1 && written === 0 && size < chunkSize)) {
header.Range = `bytes=${written}-${size === -1 ? written + chunkSize : Math.min(size - 1, written + chunkSize)}`
}
response = await fetch(this.transactorAPIUrl + `?name=${encodeURIComponent(name)}`, { headers: header })
if (header.Range != null) {
ctx.info('fetch part', { time: Date.now() - st, blobId: name, written, size })
}
if (response.status === 403) {
i = 5
// No file, so make it empty
throw new Error(`Unauthorized ${this.transactorAPIUrl}/${this.workspace.name}/${name}`)
}
if (response.status === 404) {
i = 5
// No file, so make it empty
throw new Error(`No file for ${this.transactorAPIUrl}/${this.workspace.name}/${name}`)
}
if (response.status === 416) {
if (size === -1) {
size = parseInt((response.headers.get('content-range') ?? '').split('*/')[1])
continue
}
// No file, so make it empty
throw new Error(`No file for ${this.transactorAPIUrl}/${this.workspace.name}/${name}`)
}
chunk = Buffer.from(await response.arrayBuffer())
if (header.Range == null) {
size = chunk.length
}
// We need to parse
// 'Content-Range': `bytes ${start}-${end}/${size}`
// To determine if something is left
const range = response.headers.get('Content-Range')
if (range !== null) {
const [, total] = range.split(' ')[1].split('/')
if (total !== undefined) {
size = parseInt(total)
}
}
}
await new Promise<void>((resolve, reject) => {
writable.write(chunk, (err) => {
if (err != null) {
reject(err)
}
resolve()
})
})
written += chunk.length
const newWrittenMb = Math.round(written / (1024 * 1024))
const newWrittenId = Math.round(newWrittenMb / 5)
if (writtenMb !== newWrittenId) {
writtenMb = newWrittenId
ctx.info(' >>>>Chunk', {
name,
written: newWrittenMb,
of: Math.round(size / (1024 * 1024))
})
}
break
} catch (err: any) {
if (err?.code === 'NoSuchKey') {
ctx.info('No such key', { name })
return
}
if (i > 4) {
await new Promise<void>((resolve) => {
writable.end(resolve)
})
throw err
}
await new Promise<void>((resolve) => setTimeout(resolve, 1000))
// retry
}
}
}
await new Promise<void>((resolve) => {
writable.end(resolve)
})
}
async upload (ctx: MeasureContext, name: string, size: number, contentType: string, buffer: Buffer): Promise<void> {
if (this.opt?.storageAdapter !== undefined) {
await this.opt.storageAdapter.put(ctx, this.workspace, name, buffer, contentType, size)
} else {
// TODO: We need to improve this logig, to allow restore of huge blobs
for (let i = 0; i < 5; i++) {
try {
await fetch(
this.transactorAPIUrl +
`?name=${encodeURIComponent(name)}&contentType=${encodeURIComponent(contentType)}&size=${size}`,
{
method: 'PUT',
headers: {
Authorization: 'Bearer ' + this.token,
'Content-Type': contentType
},
body: buffer
}
)
break
} catch (err: any) {
if (i === 4) {
ctx.error('failed to upload file', { name })
throw err
}
}
}
}
}
}
export * from './account'
export * from './blob'
export * from './client'

View File

@ -111,7 +111,8 @@ export async function initModel (
workspaceId: WorkspaceId,
rawTxes: Tx[],
logger: ModelLogger = consoleModelLogger,
progress: (value: number) => Promise<void>
progress: (value: number) => Promise<void>,
deleteFirst: boolean = false
): Promise<void> {
const { mongodbUri, txes } = prepareTools(rawTxes)
if (txes.some((tx) => tx.objectSpace !== core.space.Model)) {
@ -125,6 +126,21 @@ export async function initModel (
try {
const db = getWorkspaceDB(client, workspaceId)
if (deleteFirst) {
logger.log('deleting model...', workspaceId)
const result = await ctx.with(
'mongo-delete',
{},
async () =>
await db.collection(DOMAIN_TX).deleteMany({
objectSpace: core.space.Model,
modifiedBy: core.account.System,
objectClass: { $nin: [contact.class.PersonAccount, 'contact:class:EmployeeAccount'] }
})
)
logger.log('transactions deleted.', { workspaceId: workspaceId.name, count: result.deletedCount })
}
logger.log('creating model...', workspaceId)
const result = await db.collection(DOMAIN_TX).insertMany(txes as Document[])
logger.log('model transactions inserted.', { count: result.insertedCount })
@ -135,7 +151,7 @@ export async function initModel (
await progress(60)
logger.log('create minio bucket', { workspaceId })
logger.log('create storage bucket', { workspaceId })
await storageAdapter.make(ctx, workspaceId)
await progress(100)
@ -212,11 +228,11 @@ export async function initializeWorkspace (
progress: (value: number) => Promise<void>
): Promise<void> {
const initWS = branding?.initWorkspace ?? getMetadata(toolPlugin.metadata.InitWorkspace)
const sriptUrl = getMetadata(toolPlugin.metadata.InitScriptURL)
if (initWS === undefined || sriptUrl === undefined) return
const scriptUrl = getMetadata(toolPlugin.metadata.InitScriptURL)
if (initWS === undefined || scriptUrl === undefined) return
try {
// `https://raw.githubusercontent.com/hcengineering/init/main/script.yaml`
const req = await fetch(sriptUrl)
const req = await fetch(scriptUrl)
const text = await req.text()
const scripts = yaml.load(text) as any as InitScript[]
let script: InitScript | undefined
@ -233,7 +249,7 @@ export async function initializeWorkspace (
const initializer = new WorkspaceInitializer(ctx, storageAdapter, wsUrl, client)
await initializer.processScript(script, logger, progress)
} catch (err: any) {
ctx.error('Failed to create workspace', { error: err })
ctx.error('Failed to initialize workspace', { error: err })
throw err
}
}

View File

@ -0,0 +1,7 @@
module.exports = {
extends: ['./node_modules/@hcengineering/platform-rig/profiles/node/eslint.config.json'],
parserOptions: {
tsconfigRootDir: __dirname,
project: './tsconfig.json'
}
}

View File

@ -0,0 +1,4 @@
*
!/lib/**
!CHANGELOG.md
/lib/**/__tests__/

View File

@ -0,0 +1,20 @@
#!/usr/bin/env bash
#
# Copyright © 2020, 2021 Anticrm Platform Contributors.
# Copyright © 2021 Hardcore Engineering Inc.
#
# Licensed under the Eclipse Public License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. You may
# obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
#
rushx bundle
rushx docker:build
rushx docker:push

View File

@ -0,0 +1,5 @@
{
"$schema": "https://developer.microsoft.com/json-schemas/rig-package/rig.schema.json",
"rigPackageName": "@hcengineering/platform-rig",
"rigProfile": "node"
}

View File

@ -0,0 +1,7 @@
module.exports = {
preset: 'ts-jest',
testEnvironment: 'node',
testMatch: ['**/?(*.)+(spec|test).[jt]s?(x)'],
roots: ["./src"],
coverageReporters: ["text-summary", "html"]
}

View File

@ -0,0 +1,59 @@
{
"name": "@hcengineering/workspace-service",
"version": "0.6.0",
"main": "lib/index.js",
"svelte": "src/index.ts",
"types": "types/index.d.ts",
"author": "Hardcore Engineering Inc.",
"template": "@hcengineering/node-package",
"scripts": {
"start": "ts-node src/__start.ts",
"build": "compile",
"build:watch": "compile",
"format": "format src",
"test": "jest --passWithNoTests --silent --forceExit",
"_phase:build": "compile transpile src",
"_phase:test": "jest --passWithNoTests --silent --forceExit",
"_phase:format": "format src",
"_phase:validate": "compile validate"
},
"devDependencies": {
"cross-env": "~7.0.3",
"@hcengineering/platform-rig": "^0.6.0",
"@types/node": "~20.11.16",
"@typescript-eslint/eslint-plugin": "^6.11.0",
"eslint-plugin-import": "^2.26.0",
"eslint-plugin-promise": "^6.1.1",
"eslint-plugin-n": "^15.4.0",
"eslint": "^8.54.0",
"esbuild": "^0.20.0",
"@types/koa-bodyparser": "^4.3.12",
"@types/koa-router": "^7.4.8",
"@types/koa": "^2.15.0",
"@types/koa__cors": "^5.0.0",
"@typescript-eslint/parser": "^6.11.0",
"eslint-config-standard-with-typescript": "^40.0.0",
"prettier": "^3.1.0",
"ts-node": "^10.8.0",
"typescript": "^5.3.3",
"jest": "^29.7.0",
"ts-jest": "^29.1.1",
"@types/jest": "^29.5.5"
},
"dependencies": {
"@hcengineering/model": "^0.6.11",
"@hcengineering/platform": "^0.6.11",
"@hcengineering/core": "^0.6.32",
"koa": "^2.15.3",
"koa-router": "^12.0.1",
"koa-bodyparser": "^4.4.1",
"@koa/cors": "^5.0.0",
"@hcengineering/server-core": "^0.6.1",
"@hcengineering/server-tool": "^0.6.0",
"@hcengineering/server-storage": "^0.6.0",
"@hcengineering/server-pipeline": "^0.6.0",
"@hcengineering/server-client": "^0.6.0",
"@hcengineering/server-token": "^0.6.11",
"@hcengineering/analytics": "^0.6.0"
}
}

View File

@ -0,0 +1,137 @@
//
// Copyright © 2024 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { Analytics } from '@hcengineering/analytics'
import {
versionToString,
type BrandingMap,
type Data,
type MeasureContext,
type Tx,
type Version
} from '@hcengineering/core'
import { type MigrateOperation } from '@hcengineering/model'
import { setMetadata } from '@hcengineering/platform'
import serverClientPlugin from '@hcengineering/server-client'
import serverToken from '@hcengineering/server-token'
import toolPlugin from '@hcengineering/server-tool'
import { WorkspaceWorker } from './service'
export * from './ws-operations'
/**
* @public
*/
export function serveWorkspaceAccount (
measureCtx: MeasureContext,
version: Data<Version>,
txes: Tx[],
migrateOperations: [string, MigrateOperation][],
brandings: BrandingMap,
onClose?: () => void
): void {
const region = process.env.REGION ?? ''
const wsOperation = process.env.WS_OPERATION ?? 'all'
if (wsOperation !== 'all' && wsOperation !== 'create' && wsOperation !== 'upgrade') {
console.log(`Invalid operation provided: ${wsOperation}. Must be one of 'all', 'create', 'upgrade'`)
process.exit(1)
}
console.log(
'Starting workspace service in region:',
region === '' ? 'DEFAULT' : region,
'for operation:',
wsOperation,
'for version:',
versionToString(version),
'with brandings:',
brandings
)
const accountUri = process.env.ACCOUNTS_URL
if (accountUri === undefined) {
console.log('Please provide account url')
process.exit(1)
}
setMetadata(serverClientPlugin.metadata.Endpoint, accountUri)
const transactorUri = process.env.TRANSACTOR_URL
if (transactorUri === undefined) {
console.log('Please provide transactor url')
process.exit(1)
}
const serverSecret = process.env.SERVER_SECRET
if (serverSecret === undefined) {
console.log('Please provide server secret')
process.exit(1)
}
// Required by the tool
const dbUri = process.env.MONGO_URL
if (dbUri === undefined) {
console.log('Please provide mongodb url')
process.exit(1)
}
const waitTimeout = parseInt(process.env.WAIT_TIMEOUT ?? '5000')
setMetadata(serverToken.metadata.Secret, serverSecret)
const initWS = process.env.INIT_WORKSPACE
if (initWS !== undefined) {
setMetadata(toolPlugin.metadata.InitWorkspace, initWS)
}
const initScriptUrl = process.env.INIT_SCRIPT_URL
if (initScriptUrl !== undefined) {
setMetadata(toolPlugin.metadata.InitScriptURL, initScriptUrl)
}
setMetadata(serverClientPlugin.metadata.UserAgent, 'WorkspaceService')
const worker = new WorkspaceWorker(
version,
txes,
migrateOperations,
region,
parseInt(process.env.PARALLEL ?? '1'),
wsOperation,
brandings
)
void worker.start(measureCtx, {
errorHandler: async (ws, err) => {
Analytics.handleError(err)
},
force: false,
console: false,
logs: 'upgrade-logs',
waitTimeout
})
const close = (): void => {
onClose?.()
}
process.on('uncaughtException', (e) => {
measureCtx.error('uncaughtException', { error: e })
})
process.on('unhandledRejection', (reason, promise) => {
measureCtx.error('Unhandled Rejection at:', { reason, promise })
})
process.on('SIGINT', close)
process.on('SIGTERM', close)
process.on('exit', close)
}

View File

@ -0,0 +1,258 @@
//
// Copyright © 2024 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import {
type BrandingMap,
RateLimiter,
systemAccountEmail,
type BaseWorkspaceInfo,
type Data,
type MeasureContext,
type Tx,
type Version,
getBranding,
getWorkspaceId
} from '@hcengineering/core'
import { type MigrateOperation, type ModelLogger } from '@hcengineering/model'
import { getPendingWorkspace, updateWorkspaceInfo, workerHandshake } from '@hcengineering/server-client'
import { generateToken } from '@hcengineering/server-token'
import { FileModelLogger } from '@hcengineering/server-tool'
import path from 'path'
import { upgradeWorkspace, createWorkspace } from './ws-operations'
export interface WorkspaceOptions {
errorHandler: (workspace: BaseWorkspaceInfo, error: any) => Promise<void>
force: boolean
console: boolean
logs: string
ignore?: string
waitTimeout: number
}
export class WorkspaceWorker {
rateLimit: RateLimiter
constructor (
readonly version: Data<Version>,
readonly txes: Tx[],
readonly migrationOperation: [string, MigrateOperation][],
readonly region: string,
readonly limit: number,
readonly operation: 'create' | 'upgrade' | 'all',
readonly brandings: BrandingMap
) {
this.rateLimit = new RateLimiter(limit)
}
// Note: not gonna use it for now
wakeup: () => void = () => {}
defaultWakeup: () => void = () => {}
async start (ctx: MeasureContext, opt: WorkspaceOptions): Promise<void> {
this.defaultWakeup = () => {
ctx.info("I'm busy", { version: this.version, region: this.region })
}
this.wakeup = this.defaultWakeup
const token = generateToken(systemAccountEmail, { name: '-' }, { service: 'workspace' })
await workerHandshake(token, this.region, this.version, this.operation)
while (true) {
const workspace = await ctx.with('get-pending-workspace', {}, async (ctx) => {
try {
return await getPendingWorkspace(token, this.region, this.version, this.operation)
} catch (err) {
ctx.error('Error getting pending workspace:', { err })
}
})
if (workspace === undefined) {
await this.doSleep(ctx, opt)
} else {
await this.rateLimit.exec(async () => {
await this.doWorkspaceOperation(ctx, workspace, opt)
})
}
}
}
private async _createWorkspace (ctx: MeasureContext, ws: BaseWorkspaceInfo, opt: WorkspaceOptions): Promise<void> {
const t = Date.now()
const ctxModelLogger: ModelLogger = {
log (msg: string, data: any): void {
ctx.info(msg, data)
},
error (msg: string, data: any): void {
ctx.error(msg, data)
}
}
const logger = opt.console ? ctxModelLogger : new FileModelLogger(path.join(opt.logs, `${ws.workspace}.log`))
ctx.info('---CREATING----', {
workspace: ws.workspace,
version: this.version,
region: this.region
})
try {
const branding = getBranding(this.brandings, ws.branding)
const wsId = getWorkspaceId(ws.workspace)
const token = generateToken(systemAccountEmail, wsId, { service: 'workspace' })
const handleWsEvent = updateWorkspaceInfo.bind(null, token, ws.workspace)
if (ws.mode !== 'creating' || (ws.progress ?? 0) < 30) {
await createWorkspace(ctx, this.version, branding, ws, this.txes, this.migrationOperation, handleWsEvent)
} else {
// The previous attempth failed during init script and we cannot really retry it.
// But it should not be a blocker though. We can just warn user about that if we want.
// So we don't clear the previous error message if any
await handleWsEvent?.('create-done', this.version, ws.progress ?? 0)
}
ctx.info('---CREATE-DONE---------', {
workspace: ws.workspace,
version: this.version,
region: this.region,
time: Date.now() - t
})
} catch (err: any) {
await opt.errorHandler(ws, err)
logger.log('error', err)
if (!opt.console) {
ctx.error('error', { err })
}
ctx.info('---CREATE-FAILED---------', {
workspace: ws.workspace,
version: this.version,
region: this.region,
time: Date.now() - t
})
} finally {
if (!opt.console) {
;(logger as FileModelLogger).close()
}
}
}
private async _upgradeWorkspace (ctx: MeasureContext, ws: BaseWorkspaceInfo, opt: WorkspaceOptions): Promise<void> {
if (ws.disabled === true || (opt.ignore ?? '').includes(ws.workspace)) {
return
}
const t = Date.now()
const ctxModelLogger: ModelLogger = {
log (msg: string, data: any): void {
ctx.info(msg, data)
},
error (msg: string, data: any): void {
ctx.error(msg, data)
}
}
const logger = opt.console ? ctxModelLogger : new FileModelLogger(path.join(opt.logs, `${ws.workspace}.log`))
ctx.info('---UPGRADING----', {
workspace: ws.workspace,
version: this.version,
region: this.region
})
try {
const wsId = getWorkspaceId(ws.workspace)
const token = generateToken(systemAccountEmail, wsId, { service: 'workspace' })
const handleWsEvent = updateWorkspaceInfo.bind(null, token, ws.workspace)
await upgradeWorkspace(
ctx,
this.version,
this.txes,
this.migrationOperation,
ws,
logger,
handleWsEvent,
opt.force
)
ctx.info('---UPGRADE-DONE---------', {
workspace: ws.workspace,
version: this.version,
region: this.region,
time: Date.now() - t
})
} catch (err: any) {
await opt.errorHandler(ws, err)
logger.log('error', err)
if (!opt.console) {
ctx.error('error', { err })
}
ctx.info('---UPGRADE-FAILED---------', {
workspace: ws.workspace,
version: this.version,
region: this.region,
time: Date.now() - t
})
} finally {
if (!opt.console) {
;(logger as FileModelLogger).close()
}
}
}
private async doWorkspaceOperation (
ctx: MeasureContext,
workspace: BaseWorkspaceInfo,
opt: WorkspaceOptions
): Promise<void> {
switch (workspace.mode) {
case 'creating':
case 'pending-creation':
// We need to either start workspace creation
// or see if we need to restart it
await this._createWorkspace(ctx, workspace, opt)
break
case 'upgrading':
case 'active':
// It seem version upgrade is required, or upgrade is not finished on previoous iteration.
// It's safe to upgrade the workspace again as the procedure allows re-trying.
await this._upgradeWorkspace(ctx, workspace, opt)
break
case 'deleting':
// Seems we failed to delete, so let's restore deletion.
// TODO: move from account
break
}
}
private async doSleep (ctx: MeasureContext, opt: WorkspaceOptions): Promise<void> {
await new Promise<void>((resolve) => {
const wakeup: () => void = () => {
resolve()
this.wakeup = this.defaultWakeup
}
// sleep for 5 seconds for the next operation, or until a wakeup event
const sleepHandle = setTimeout(wakeup, opt.waitTimeout)
this.wakeup = () => {
clearTimeout(sleepHandle)
wakeup()
}
})
}
}

View File

@ -0,0 +1,259 @@
import core, {
type Client,
getWorkspaceId,
systemAccountEmail,
versionToString,
type BaseWorkspaceInfo,
type Branding,
type Data,
type MeasureContext,
type Tx,
type Version,
type WorkspaceIdWithUrl,
TxOperations
} from '@hcengineering/core'
import { consoleModelLogger, type MigrateOperation, type ModelLogger } from '@hcengineering/model'
import { getTransactorEndpoint } from '@hcengineering/server-client'
import {
DummyFullTextAdapter,
type Pipeline,
SessionContextImpl,
type PipelineFactory,
type StorageConfiguration
} from '@hcengineering/server-core'
import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage'
import { generateToken } from '@hcengineering/server-token'
import { initModel, prepareTools, upgradeModel, initializeWorkspace, updateModel } from '@hcengineering/server-tool'
import {
createIndexStages,
createServerPipeline,
registerServerPlugins,
registerStringLoaders
} from '@hcengineering/server-pipeline'
function wrapPipeline (ctx: MeasureContext, pipeline: Pipeline, wsUrl: WorkspaceIdWithUrl): Client {
const sctx = new SessionContextImpl(
ctx,
systemAccountEmail,
'backup',
true,
{ targets: {}, txes: [] },
wsUrl,
null,
false,
new Map(),
new Map()
)
return {
findAll: async (_class, query, options) => {
return await pipeline.findAll(sctx, _class, query, options)
},
findOne: async (_class, query, options) => {
return (await pipeline.findAll(sctx, _class, query, { ...options, limit: 1 })).shift()
},
close: async () => {
await pipeline.close()
},
getHierarchy: () => {
return pipeline.storage.hierarchy
},
getModel: () => {
return pipeline.storage.modelDb
},
searchFulltext: async (query, options) => {
return {
docs: [],
total: 0
}
},
tx: async (tx) => {
return await pipeline.tx(sctx, tx)
},
notify: (...tx) => {}
}
}
/**
* @public
*/
export async function createWorkspace (
ctx: MeasureContext,
version: Data<Version>,
branding: Branding | null,
workspaceInfo: BaseWorkspaceInfo,
txes: Tx[],
migrationOperation: [string, MigrateOperation][],
handleWsEvent?: (
event: 'ping' | 'create-started' | 'progress' | 'create-done',
version: Data<Version>,
progress: number,
message?: string
) => Promise<void>
): Promise<void> {
const childLogger = ctx.newChild('createWorkspace', {}, { workspace: workspaceInfo.workspace })
const ctxModellogger: ModelLogger = {
log: (msg, data) => {
childLogger.info(msg, data)
},
error: (msg, data) => {
childLogger.error(msg, data)
}
}
const createPingHandle = setInterval(() => {
void handleWsEvent?.('ping', version, 0)
}, 5000)
try {
const wsUrl: WorkspaceIdWithUrl = {
name: workspaceInfo.workspace,
workspaceName: workspaceInfo.workspaceName ?? '',
workspaceUrl: workspaceInfo.workspaceUrl ?? ''
}
const wsId = getWorkspaceId(workspaceInfo.workspace)
await handleWsEvent?.('create-started', version, 10)
await childLogger.withLog('init-workspace', {}, async (ctx) => {
const deleteModelFirst = workspaceInfo.mode === 'creating'
await initModel(
ctx,
wsId,
txes,
ctxModellogger,
async (value) => {
await handleWsEvent?.('progress', version, 10 + Math.round((Math.min(value, 100) / 100) * 10))
},
deleteModelFirst
)
})
const { mongodbUri } = prepareTools([])
const storageConfig: StorageConfiguration = storageConfigFromEnv()
const storageAdapter = buildStorageFromConfig(storageConfig, mongodbUri)
try {
registerServerPlugins()
registerStringLoaders()
const factory: PipelineFactory = createServerPipeline(
ctx,
mongodbUri,
{
externalStorage: storageAdapter,
fullTextUrl: 'http://localhost:9200',
indexParallel: 0,
indexProcessing: 0,
rekoniUrl: '',
usePassedCtx: true
},
{
fulltextAdapter: {
factory: async () => new DummyFullTextAdapter(),
url: '',
stages: (adapter, storage, storageAdapter, contentAdapter) =>
createIndexStages(
ctx.newChild('stages', {}),
wsUrl,
branding,
adapter,
storage,
storageAdapter,
contentAdapter,
0,
0
)
}
}
)
const pipeline = await factory(ctx, wsUrl, true, () => {}, null)
const client = new TxOperations(wrapPipeline(ctx, pipeline, wsUrl), core.account.System)
await updateModel(ctx, wsId, migrationOperation, client, ctxModellogger, async (value) => {
await handleWsEvent?.('progress', version, 20 + Math.round((Math.min(value, 100) / 100) * 10))
})
await initializeWorkspace(ctx, branding, wsUrl, storageAdapter, client, ctxModellogger, async (value) => {
await handleWsEvent?.('progress', version, 30 + Math.round((Math.min(value, 100) / 100) * 70))
})
await pipeline.close()
await handleWsEvent?.('create-done', version, 100, '')
} catch (err: any) {
await handleWsEvent?.('ping', version, 0, `Create failed: ${err.message}`)
} finally {
await storageAdapter.close()
}
} finally {
clearInterval(createPingHandle)
childLogger.end()
}
}
/**
* @public
*/
export async function upgradeWorkspace (
ctx: MeasureContext,
version: Data<Version>,
txes: Tx[],
migrationOperation: [string, MigrateOperation][],
ws: BaseWorkspaceInfo,
logger: ModelLogger = consoleModelLogger,
handleWsEvent?: (
event: 'upgrade-started' | 'progress' | 'upgrade-done' | 'ping',
version: Data<Version>,
progress: number,
message?: string
) => Promise<void>,
forceUpdate: boolean = true,
forceIndexes: boolean = false,
external: boolean = false
): Promise<void> {
const versionStr = versionToString(version)
if (ws?.version !== undefined && !forceUpdate && versionStr === versionToString(ws.version)) {
return
}
ctx.info('upgrading', {
force: forceUpdate,
currentVersion: ws?.version !== undefined ? versionToString(ws.version) : '',
toVersion: versionStr,
workspace: ws.workspace
})
const wsId = getWorkspaceId(ws.workspace)
const token = generateToken(systemAccountEmail, wsId, { service: 'workspace' })
let progress = 0
const updateProgressHandle = setInterval(() => {
void handleWsEvent?.('progress', version, progress)
}, 5000)
try {
await handleWsEvent?.('upgrade-started', version, 0)
await upgradeModel(
ctx,
await getTransactorEndpoint(token, external ? 'external' : 'internal'),
getWorkspaceId(ws.workspace),
txes,
migrationOperation,
logger,
false,
async (value) => {
progress = value
},
forceIndexes
)
await handleWsEvent?.('upgrade-done', version, 100, '')
} catch (err: any) {
await handleWsEvent?.('ping', version, 0, `Upgrade failed: ${err.message}`)
} finally {
clearInterval(updateProgressHandle)
}
}

View File

@ -0,0 +1,10 @@
{
"extends": "./node_modules/@hcengineering/platform-rig/profiles/node/tsconfig.json",
"compilerOptions": {
"rootDir": "./src",
"outDir": "./lib",
"declarationDir": "./types",
"tsBuildInfoFile": ".build/build.tsbuildinfo"
}
}

View File

@ -255,7 +255,7 @@ class TSessionManager implements SessionManager {
if (userInfo.error !== undefined) {
throw new Error(JSON.stringify(userInfo.error))
}
return { ...userInfo.result, upgrade: userInfo.upgrade }
return userInfo.result
} catch (err: any) {
if (err?.cause?.code === 'ECONNRESET' || err?.cause?.code === 'ECONNREFUSED') {
return undefined
@ -294,7 +294,7 @@ class TSessionManager implements SessionManager {
return { upgrade: true }
}
if (workspaceInfo?.creating === true && token.email !== systemAccountEmail) {
if (['pending-creation', 'creating'].includes(workspaceInfo?.mode) && token.email !== systemAccountEmail) {
// No access to workspace for token.
return { error: new Error(`Workspace during creation phase ${token.email} ${token.workspace.name}`) }
}
@ -428,8 +428,8 @@ class TSessionManager implements SessionManager {
createdBy: '',
createdOn: Date.now(),
lastVisit: Date.now(),
createProgress: 100,
creating: false,
mode: 'active',
progress: 100,
disabled: false,
endpoint: ''
}

View File

@ -125,7 +125,7 @@ export class AIBotController {
return
}
if (info.creating === true) {
if (['pending-creation', 'creating'].includes(info?.mode)) {
this.ctx.info('Workspace is creating -> waiting...', { workspace })
this.assignTimeout = setTimeout(() => {
void this.assignToWorkspace(workspace)

View File

@ -137,7 +137,7 @@ export class Collector {
return false
}
if (info?.creating === true) {
if (['pending-creation', 'creating'].includes(info?.mode)) {
return false
}

View File

@ -48,6 +48,23 @@ services:
- STORAGE_CONFIG=${STORAGE_CONFIG}
- MODEL_ENABLED=*
- BRANDING_PATH=/var/cfg/branding.json
workspace:
image: hardcoreeng/workspace
links:
- mongodb
- minio
volumes:
- ./branding-test.json:/var/cfg/branding.json
environment:
- SERVER_SECRET=secret
- MONGO_URL=mongodb://mongodb:27018
- TRANSACTOR_URL=ws://transactor:3334;ws://localhost:3334
- STORAGE_CONFIG=${STORAGE_CONFIG}
- MODEL_ENABLED=*
- ACCOUNTS_URL=http://account:3003
- BRANDING_PATH=/var/cfg/branding.json
- NOTIFY_INBOX_ONLY=true
restart: unless-stopped
front:
image: hardcoreeng/front
pull_policy: never