Use storage adapter instead of fetch in services (#6269)

Signed-off-by: Alexander Onnikov <Alexander.Onnikov@xored.com>
This commit is contained in:
Alexander Onnikov 2024-08-08 18:59:04 +07:00 committed by GitHub
parent 73cc6b8dd1
commit a966e184c1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 251 additions and 316 deletions

3
.vscode/launch.json vendored
View File

@ -280,7 +280,8 @@
"SERVER_SECRET": "secret",
"ACCOUNTS_URL": "http://localhost:3000",
"COLLABORATOR_API_URL": "http://localhost:3078",
"FRONT_URL": "http://localhost:8080"
"STORAGE_CONFIG": "minio|localhost?accessKey=minioadmin&secretKey=minioadmin",
"MONGO_URL": "mongodb://localhost:27017"
},
"runtimeArgs": ["--nolazy", "-r", "ts-node/register"],
"runtimeVersion": "20",

View File

@ -1538,9 +1538,6 @@ dependencies:
fork-ts-checker-webpack-plugin:
specifier: ~7.3.0
version: 7.3.0(typescript@5.3.3)(webpack@5.90.3)
form-data:
specifier: ^4.0.0
version: 4.0.0
gaxios:
specifier: ^5.0.1
version: 5.1.3
@ -29133,7 +29130,7 @@ packages:
dev: false
file:projects/pod-calendar.tgz(bufferutil@4.0.8)(ts-node@10.9.2)(utf-8-validate@6.0.4):
resolution: {integrity: sha512-rnCLCkBGWm2o9YTk6AnoYlNCACtncXiYw+CUkhVFJmBNTE1l90C74Uci4ldJBpxEOyCHVBzJIjsp6sMbQwafkw==, tarball: file:projects/pod-calendar.tgz}
resolution: {integrity: sha512-1kg3vKf6VEBcDutX3v29cjqDtrRghg7rXglm3OHuMwXC6WewfD+aM6xQ8hlUZjcVIq09Hv0nVr/n2SxUtal8LQ==, tarball: file:projects/pod-calendar.tgz}
id: file:projects/pod-calendar.tgz
name: '@rush-temp/pod-calendar'
version: 0.0.0
@ -29363,7 +29360,7 @@ packages:
dev: false
file:projects/pod-gmail.tgz(bufferutil@4.0.8)(ts-node@10.9.2)(utf-8-validate@6.0.4):
resolution: {integrity: sha512-qeyt7Pl1TwEo/JojBg4R4LR3uWEdxOfqfz6tYB5kB27oEdE+jBJx0LGgsrDYoQSsv7A0LLnaFVfS+XT17EoKeQ==, tarball: file:projects/pod-gmail.tgz}
resolution: {integrity: sha512-o5Ml8egkEHpCTaZ/hYgdA/5naofYkU52SAHnIZhyeFgIYKttpuNqi0wb25P36h3xhAA5WFdyytNPHOf7OBL4VA==, tarball: file:projects/pod-gmail.tgz}
id: file:projects/pod-gmail.tgz
name: '@rush-temp/pod-gmail'
version: 0.0.0
@ -29374,6 +29371,7 @@ packages:
'@types/jest': 29.5.12
'@types/node': 20.11.19
'@types/node-fetch': 2.6.11
'@types/uuid': 8.3.4
'@types/ws': 8.5.11
'@typescript-eslint/eslint-plugin': 6.21.0(@typescript-eslint/parser@6.21.0)(eslint@8.56.0)(typescript@5.3.3)
'@typescript-eslint/parser': 6.21.0(eslint@8.56.0)(typescript@5.3.3)
@ -29401,6 +29399,7 @@ packages:
ts-jest: 29.1.2(esbuild@0.20.1)(jest@29.7.0)(typescript@5.3.3)
ts-node-dev: 2.0.0(@types/node@20.11.19)(typescript@5.3.3)
typescript: 5.3.3
uuid: 8.3.2
ws: 8.18.0(bufferutil@4.0.8)(utf-8-validate@6.0.4)
transitivePeerDependencies:
- '@aws-sdk/credential-providers'
@ -29710,7 +29709,7 @@ packages:
dev: false
file:projects/pod-telegram.tgz(bufferutil@4.0.8)(ts-node@10.9.2)(utf-8-validate@6.0.4):
resolution: {integrity: sha512-MF+eEeVhFR4XQj2YaAP6gjvm1tijtnRUMDrQt48vZBD8mwQA8B0drOVHCkOQ+NJOYqi1c3pRG/+kPKWqswbplQ==, tarball: file:projects/pod-telegram.tgz}
resolution: {integrity: sha512-GCgA0Jny/OVcKWLkF1oA4awBVXQhFnHrfWujxfxke9g9S5FXNKrzZe/DCTlK/d/hRJ1XFj0Zw03rvBDhAKfOsg==, tarball: file:projects/pod-telegram.tgz}
id: file:projects/pod-telegram.tgz
name: '@rush-temp/pod-telegram'
version: 0.0.0
@ -29722,6 +29721,7 @@ packages:
'@types/mime': 3.0.4
'@types/node': 20.11.19
'@types/node-fetch': 2.6.11
'@types/uuid': 8.3.4
'@types/ws': 8.5.11
'@typescript-eslint/eslint-plugin': 6.21.0(@typescript-eslint/parser@6.21.0)(eslint@8.56.0)(typescript@5.3.3)
'@typescript-eslint/parser': 6.21.0(eslint@8.56.0)(typescript@5.3.3)
@ -29749,6 +29749,7 @@ packages:
ts-node-dev: 2.0.0(@types/node@20.11.19)(typescript@5.3.3)
ts-standard: 12.0.2(typescript@5.3.3)
typescript: 5.3.3
uuid: 8.3.2
ws: 8.18.0(bufferutil@4.0.8)(utf-8-validate@6.0.4)
transitivePeerDependencies:
- '@aws-sdk/credential-providers'
@ -29760,7 +29761,6 @@ packages:
- babel-jest
- babel-plugin-macros
- bufferutil
- encoding
- gcp-metadata
- kerberos
- mongodb-client-encryption
@ -30161,7 +30161,7 @@ packages:
dev: false
file:projects/qms-doc-import-tool.tgz:
resolution: {integrity: sha512-jxeLHsk5jNj+ABvXoCiX9VTv7ovVKEdkwddcmwpdTGjt7hBgtYsirnVxkryV8tfWMs5y99wjroKGfwSqtyzzOg==, tarball: file:projects/qms-doc-import-tool.tgz}
resolution: {integrity: sha512-mzTjks1peZbU8165Sd1xaoYs9H9f37XszY2zoxtggnaJrP1GeEktuX0TtNTz1XJRsMOg0+0K3s3CRYUEM9+cYw==, tarball: file:projects/qms-doc-import-tool.tgz}
name: '@rush-temp/qms-doc-import-tool'
version: 0.0.0
dependencies:
@ -30171,6 +30171,7 @@ packages:
'@types/minio': 7.0.18
'@types/node': 20.11.19
'@types/node-fetch': 2.6.11
'@types/uuid': 8.3.4
'@typescript-eslint/eslint-plugin': 6.21.0(@typescript-eslint/parser@6.21.0)(eslint@8.56.0)(typescript@5.3.3)
'@typescript-eslint/parser': 6.21.0(eslint@8.56.0)(typescript@5.3.3)
commander: 8.3.0
@ -30193,6 +30194,7 @@ packages:
ts-jest: 29.1.2(esbuild@0.20.1)(jest@29.7.0)(typescript@5.3.3)
ts-node: 10.9.2(@types/node@20.11.19)(typescript@5.3.3)
typescript: 5.3.3
uuid: 8.3.2
zod: 3.23.8
transitivePeerDependencies:
- '@babel/core'
@ -30201,7 +30203,6 @@ packages:
- '@swc/wasm'
- babel-jest
- babel-plugin-macros
- encoding
- node-notifier
- supports-color
dev: false

View File

@ -15,7 +15,7 @@
"build:watch": "compile",
"_phase:bundle": "rushx bundle",
"bundle": "mkdir -p bundle && node esbuild.js",
"run-local": "cross-env SERVER_SECRET=secret COLLABORATOR_URL=ws://localhost:3078 COLLABORATOR_API_URL=http://localhost:3078 FRONT_URL=http://localhost:8087 PRODUCT_ID=ezqms node --nolazy -r ts-node/register ./src/__start.ts",
"run-local": "cross-env SERVER_SECRET=secret MONGO_URL=mongodb://localhost:27017 COLLABORATOR_URL=ws://localhost:3078 COLLABORATOR_API_URL=http://localhost:3078 STORAGE_CONFIG=minio|minio?accessKey=minioadmin&secretKey=minioadmin PRODUCT_ID=ezqms node --nolazy -r ts-node/register ./src/__start.ts",
"run": "cross-env node -r ts-node/register --max-old-space-size=8000 ./src/__start.ts",
"format": "format src",
"test": "jest --passWithNoTests --silent",
@ -36,7 +36,6 @@
"esbuild": "^0.20.0",
"@types/minio": "~7.0.11",
"@types/node": "~20.11.16",
"@types/node-fetch": "~2.6.2",
"@typescript-eslint/parser": "^6.11.0",
"eslint-config-standard-with-typescript": "^40.0.0",
"prettier": "^3.1.0",
@ -55,6 +54,7 @@
"@hcengineering/core": "^0.6.32",
"@hcengineering/platform": "^0.6.11",
"@hcengineering/server-core": "^0.6.1",
"@hcengineering/server-storage": "^0.6.0",
"@hcengineering/server-token": "^0.6.11",
"@hcengineering/server-tool": "^0.6.0",
"@hcengineering/server-client": "^0.6.0",
@ -62,11 +62,9 @@
"commander": "^8.1.0",
"domhandler": "^5.0.3",
"domutils": "^3.1.0",
"form-data": "^4.0.0",
"htmlparser2": "^9.0.0",
"mammoth": "^1.6.0",
"docx4js": "^3.2.20",
"node-fetch": "^2.6.6",
"zod": "^3.22.4"
}
}

View File

@ -1,3 +1,4 @@
import { MeasureContext } from '@hcengineering/core'
import docx4js from 'docx4js'
import { AnyNode } from 'domhandler'
@ -7,7 +8,7 @@ import importExtractedFile from './import'
import convert from './convert/convert'
import { Config } from './config'
export async function importDoc (config: Config): Promise<void> {
export async function importDoc (ctx: MeasureContext, config: Config): Promise<void> {
const { specFile, doc, backend } = config
const spec = await read(specFile)
@ -23,5 +24,5 @@ export async function importDoc (config: Config): Promise<void> {
const contents = await convert(doc, backend)
const extractedFile = await extract(contents, spec, headerRoot)
await importExtractedFile(config, extractedFile)
await importExtractedFile(ctx, config, extractedFile)
}

View File

@ -1,6 +1,7 @@
import { Employee } from '@hcengineering/contact'
import { Ref, WorkspaceId } from '@hcengineering/core'
import { DocumentSpace } from '@hcengineering/controlled-documents'
import { StorageAdapter } from '@hcengineering/server-core'
import { HtmlConversionBackend } from './convert/convert'
@ -13,5 +14,6 @@ export interface Config {
owner: Ref<Employee>
backend: HtmlConversionBackend
space: Ref<DocumentSpace>
storageAdapter: StorageAdapter
specFile?: string
}

View File

@ -1,6 +1,4 @@
import fs from 'node:fs/promises'
import fetch from 'node-fetch'
import FormData from 'form-data'
export async function readFile (doc: string): Promise<string> {
const buffer = await fs.readFile(doc)
@ -21,40 +19,3 @@ export function compareStrExact (a: string, b: string): boolean {
export function clean (s: string): string {
return s.replaceAll('\n', ' ').trim()
}
export async function uploadFile (
contents: string,
name: string,
type: string,
uploadURL: string,
token: string
): Promise<string> {
const data = new FormData()
const buffer = Buffer.from(contents, 'base64')
data.append(
'file',
Object.assign(buffer, {
lastModified: Date.now(),
name,
type
})
)
const resp = await fetch(uploadURL, {
method: 'POST',
headers: {
Authorization: `Bearer ${token}`
},
body: data
})
if (resp.status !== 200) {
if (resp.status === 413) {
throw new Error('File is too large')
} else {
throw Error(`Failed to upload file: ${resp.statusText}`)
}
}
return await resp.text()
}

View File

@ -20,6 +20,7 @@ import core, {
BackupClient,
Client as CoreClient,
Data,
MeasureContext,
Ref,
TxOperations,
generateId,
@ -27,19 +28,21 @@ import core, {
systemAccountEmail,
type Blob
} from '@hcengineering/core'
import { getMetadata } from '@hcengineering/platform'
import serverCore from '@hcengineering/server-core'
import { createClient, getTransactorEndpoint } from '@hcengineering/server-client'
import { generateToken } from '@hcengineering/server-token'
import { findAll, getOuterHTML } from 'domutils'
import { parseDocument } from 'htmlparser2'
import { createClient, getTransactorEndpoint } from '@hcengineering/server-client'
import { generateToken } from '@hcengineering/server-token'
import { Config } from './config'
import { ExtractedFile } from './extract/extract'
import { ExtractedSection } from './extract/sections'
import { compareStrExact, uploadFile } from './helpers'
import { compareStrExact } from './helpers'
export default async function importExtractedFile (config: Config, extractedFile: ExtractedFile): Promise<void> {
export default async function importExtractedFile (
ctx: MeasureContext,
config: Config,
extractedFile: ExtractedFile
): Promise<void> {
const { workspaceId } = config
const token = generateToken(systemAccountEmail, workspaceId)
const transactorUrl = await getTransactorEndpoint(token, 'external')
@ -53,7 +56,7 @@ export default async function importExtractedFile (config: Config, extractedFile
try {
const docId = await createDocument(txops, extractedFile, config)
const createdDoc = await txops.findOne(documents.class.Document, { _id: docId })
await createSections(txops, extractedFile, config, createdDoc)
await createSections(ctx, txops, extractedFile, config, createdDoc)
} finally {
await txops.close()
}
@ -199,6 +202,7 @@ async function createTemplateIfNotExist (
}
async function createSections (
ctx: MeasureContext,
txops: TxOperations,
extractedFile: ExtractedFile,
config: Config,
@ -263,7 +267,7 @@ async function createSections (
prevSection = existingSection
}
await processImages(txops, section, config)
await processImages(ctx, txops, section, config)
const collabSectionId =
existingSection != null && h.isDerived(existingSection._class, documents.class.CollaborativeDocumentSection)
@ -296,13 +300,16 @@ async function createSections (
}
}
export async function processImages (txops: TxOperations, section: ExtractedSection, config: Config): Promise<void> {
export async function processImages (
ctx: MeasureContext,
txops: TxOperations,
section: ExtractedSection,
config: Config
): Promise<void> {
const dom = parseDocument(section.content)
const imageNodes = findAll((n) => n.tagName === 'img', dom.children)
const { uploadURL, token, space } = config
const frontUrl = getMetadata(serverCore.metadata.FrontUrl) ?? ''
const fullUploadURL = `${frontUrl}${uploadURL}`
const { storageAdapter, workspaceId, uploadURL, space } = config
const imageUploads = imageNodes.map(async (img) => {
const src = img.attribs.src
@ -313,14 +320,16 @@ export async function processImages (txops: TxOperations, section: ExtractedSect
return
}
const fileContents = extracted[2]
const fileSize = Buffer.from(fileContents, 'base64').length
const fileContentsBase64 = extracted[2]
const fileContents = Buffer.from(fileContentsBase64, 'base64')
const fileSize = fileContents.length
const mimeType = extracted[1]
const ext = mimeType.split('/')[1]
const fileName = `${generateId()}.${ext}`
// upload
const uuid = await uploadFile(fileContents, fileName, mimeType, fullUploadURL, token)
const uuid = generateId()
await storageAdapter.put(ctx, workspaceId, uuid, fileContents, mimeType, fileSize)
// attachment
const attachmentId: Ref<Attachment> = generateId()

View File

@ -14,13 +14,14 @@
//
import { Employee } from '@hcengineering/contact'
import documents, { DocumentSpace } from '@hcengineering/controlled-documents'
import { Ref, getWorkspaceId, systemAccountEmail } from '@hcengineering/core'
import { MeasureMetricsContext, Ref, getWorkspaceId, systemAccountEmail } from '@hcengineering/core'
import { setMetadata } from '@hcengineering/platform'
import serverCore from '@hcengineering/server-core'
import serverClientPlugin from '@hcengineering/server-client'
import { type StorageAdapter } from '@hcengineering/server-core'
import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage'
import serverToken, { generateToken } from '@hcengineering/server-token'
import { program } from 'commander'
import serverClientPlugin from '@hcengineering/server-client'
import { importDoc } from './commands'
import { Config } from './config'
import { getBackend } from './convert/convert'
@ -29,6 +30,8 @@ import { getBackend } from './convert/convert'
* @public
*/
export function docImportTool (): void {
const ctx = new MeasureMetricsContext('doc-import-tool', {})
const serverSecret = process.env.SERVER_SECRET
if (serverSecret === undefined) {
console.error('please provide server secret')
@ -49,15 +52,24 @@ export function docImportTool (): void {
const uploadUrl = process.env.UPLOAD_URL ?? '/files'
const frontUrl = process.env.FRONT_URL
if (frontUrl === undefined) {
console.log('Please provide front url')
const mongodbUri = process.env.MONGO_URL
if (mongodbUri === undefined) {
console.log('Please provide mongodb url')
process.exit(1)
}
setMetadata(serverClientPlugin.metadata.Endpoint, accountUrl)
setMetadata(serverToken.metadata.Secret, serverSecret)
setMetadata(serverCore.metadata.FrontUrl, frontUrl)
async function withStorage (mongodbUri: string, f: (storageAdapter: StorageAdapter) => Promise<any>): Promise<void> {
const adapter = buildStorageFromConfig(storageConfigFromEnv(), mongodbUri)
try {
await f(adapter)
} catch (err: any) {
console.error(err)
}
await adapter.close()
}
program.version('0.0.1')
@ -79,7 +91,8 @@ export function docImportTool (): void {
cmd.spec
}, space: ${cmd.space}, backend: ${cmd.backend}`
)
try {
await withStorage(mongodbUri, async (storageAdapter) => {
const workspaceId = getWorkspaceId(workspace)
const config: Config = {
@ -90,14 +103,13 @@ export function docImportTool (): void {
specFile: cmd.spec,
space: cmd.space,
uploadURL: uploadUrl,
storageAdapter,
collaboratorApiURL: collaboratorApiUrl,
token: generateToken(systemAccountEmail, workspaceId)
}
await importDoc(config)
} catch (err: any) {
console.trace(err)
}
await importDoc(ctx, config)
})
}
)

View File

@ -30,7 +30,7 @@ export async function removeDocument (
params: RpcMethodParams
): Promise<RemoveDocumentResponse> {
const { documentId } = payload
const { hocuspocus, minio } = params
const { hocuspocus, storageAdapter } = params
const { workspaceId } = context
const document = hocuspocus.documents.get(documentId)
@ -40,11 +40,11 @@ export async function removeDocument (
}
const { collaborativeDoc } = parseDocumentId(documentId)
const { documentId: minioDocumentId } = collaborativeDocParse(collaborativeDoc)
const historyDocumentId = collaborativeHistoryDocId(minioDocumentId)
const { documentId: contentDocumentId } = collaborativeDocParse(collaborativeDoc)
const historyDocumentId = collaborativeHistoryDocId(contentDocumentId)
try {
await minio.remove(ctx, workspaceId, [minioDocumentId, historyDocumentId])
await storageAdapter.remove(ctx, workspaceId, [contentDocumentId, historyDocumentId])
} catch (err) {
ctx.error('failed to remove document', { documentId, error: err })
}

View File

@ -37,7 +37,7 @@ export async function takeSnapshot (
params: RpcMethodParams
): Promise<TakeSnapshotResponse> {
const { documentId, snapshotName, createdBy } = payload
const { hocuspocus, minio } = params
const { hocuspocus, storageAdapter } = params
const { workspaceId } = context
const version: YDocVersion = {
@ -48,7 +48,7 @@ export async function takeSnapshot (
}
const { collaborativeDoc } = parseDocumentId(documentId)
const { documentId: minioDocumentId, versionId } = collaborativeDocParse(collaborativeDoc)
const { documentId: contentDocumentId, versionId } = collaborativeDocParse(collaborativeDoc)
if (versionId !== CollaborativeDocVersionHead) {
throw new Error('invalid document version')
}
@ -58,11 +58,11 @@ export async function takeSnapshot (
})
try {
// load history document directly from minio
const historyDocumentId = collaborativeHistoryDocId(minioDocumentId)
// load history document directly from storage
const historyDocumentId = collaborativeHistoryDocId(contentDocumentId)
const yHistory =
(await ctx.with('yDocFromMinio', {}, async () => {
return await yDocFromStorage(ctx, minio, workspaceId, historyDocumentId)
(await ctx.with('yDocFromStorage', {}, async () => {
return await yDocFromStorage(ctx, storageAdapter, workspaceId, historyDocumentId)
})) ?? new YDoc()
await ctx.with('createYdocSnapshot', {}, async () => {
@ -71,8 +71,8 @@ export async function takeSnapshot (
})
})
await ctx.with('yDocToMinio', {}, async () => {
await yDocToStorage(ctx, minio, workspaceId, historyDocumentId, yHistory)
await ctx.with('yDocToStorage', {}, async () => {
await yDocToStorage(ctx, storageAdapter, workspaceId, historyDocumentId, yHistory)
})
return { ...version }

View File

@ -39,6 +39,6 @@ export type RpcMethod = (
export interface RpcMethodParams {
hocuspocus: Hocuspocus
minio: StorageAdapter
storageAdapter: StorageAdapter
transformer: Transformer
}

View File

@ -15,6 +15,8 @@
import { Analytics } from '@hcengineering/analytics'
import { MeasureContext, generateId, metricsAggregate } from '@hcengineering/core'
import type { MongoClientReference } from '@hcengineering/mongo'
import type { StorageAdapter } from '@hcengineering/server-core'
import { Token, decodeToken } from '@hcengineering/server-token'
import { ServerKit } from '@hcengineering/text'
import { Hocuspocus } from '@hocuspocus/server'
@ -24,8 +26,6 @@ import express from 'express'
import { IncomingMessage, createServer } from 'http'
import { WebSocket, WebSocketServer } from 'ws'
import type { MongoClientReference } from '@hcengineering/mongo'
import type { StorageAdapter } from '@hcengineering/server-core'
import { Config } from './config'
import { Context } from './context'
import { AuthenticationExtension } from './extensions/authentication'
@ -46,7 +46,7 @@ export type Shutdown = () => Promise<void>
export async function start (
ctx: MeasureContext,
config: Config,
minio: StorageAdapter,
storageAdapter: StorageAdapter,
mongoClient: MongoClientReference
): Promise<Shutdown> {
const port = config.Port
@ -116,7 +116,7 @@ export async function start (
}),
new StorageExtension({
ctx: extensionsCtx.newChild('storage', {}),
adapter: new PlatformStorageAdapter({ minio }, mongo, transformer)
adapter: new PlatformStorageAdapter(storageAdapter, mongo, transformer)
})
]
})
@ -179,7 +179,7 @@ export async function start (
await rpcCtx.with('/rpc', { method: request.method }, async (ctx) => {
try {
const response: RpcResponse = await rpcCtx.with(request.method, {}, async (ctx) => {
return await method(ctx, context, request.payload, { hocuspocus, minio, transformer })
return await method(ctx, context, request.payload, { hocuspocus, storageAdapter, transformer })
})
res.status(200).send(response)
} catch (err: any) {

View File

@ -42,11 +42,9 @@ import { Context } from '../context'
import { CollabStorageAdapter } from './adapter'
export type StorageAdapters = Record<string, StorageAdapter>
export class PlatformStorageAdapter implements CollabStorageAdapter {
constructor (
private readonly adapters: StorageAdapters,
private readonly storage: StorageAdapter,
private readonly mongodb: MongoClient,
private readonly transformer: Transformer
) {}
@ -149,27 +147,16 @@ export class PlatformStorageAdapter implements CollabStorageAdapter {
}
}
getStorageAdapter (storage: string): StorageAdapter {
const adapter = this.adapters[storage]
if (adapter === undefined) {
throw new Error(`unknown storage adapter ${storage}`)
}
return adapter
}
async loadDocumentFromStorage (
ctx: MeasureContext,
documentId: DocumentId,
context: Context
): Promise<YDoc | undefined> {
const { storage, collaborativeDoc } = parseDocumentId(documentId)
const adapter = this.getStorageAdapter(storage)
const { collaborativeDoc } = parseDocumentId(documentId)
return await ctx.with('load-document', { storage }, async (ctx) => {
return await ctx.with('load-document', {}, async (ctx) => {
return await withRetry(ctx, 5, async () => {
return await loadCollaborativeDoc(adapter, context.workspaceId, collaborativeDoc, ctx)
return await loadCollaborativeDoc(this.storage, context.workspaceId, collaborativeDoc, ctx)
})
})
}
@ -180,12 +167,11 @@ export class PlatformStorageAdapter implements CollabStorageAdapter {
document: YDoc,
context: Context
): Promise<void> {
const { storage, collaborativeDoc } = parseDocumentId(documentId)
const adapter = this.getStorageAdapter(storage)
const { collaborativeDoc } = parseDocumentId(documentId)
await ctx.with('save-document', {}, async (ctx) => {
await withRetry(ctx, 5, async () => {
await saveCollaborativeDoc(adapter, context.workspaceId, collaborativeDoc, document, ctx)
await saveCollaborativeDoc(this.storage, context.workspaceId, collaborativeDoc, document, ctx)
})
})
}
@ -197,8 +183,7 @@ export class PlatformStorageAdapter implements CollabStorageAdapter {
document: YDoc,
context: Context
): Promise<YDocVersion | undefined> {
const { storage, collaborativeDoc } = parseDocumentId(documentId)
const adapter = this.getStorageAdapter(storage)
const { collaborativeDoc } = parseDocumentId(documentId)
const { workspaceId } = context
@ -212,7 +197,7 @@ export class PlatformStorageAdapter implements CollabStorageAdapter {
}
await ctx.with('take-snapshot', {}, async (ctx) => {
await takeCollaborativeDocSnapshot(adapter, workspaceId, collaborativeDoc, document, yDocVersion, ctx)
await takeCollaborativeDocSnapshot(this.storage, workspaceId, collaborativeDoc, document, yDocVersion, ctx)
})
return yDocVersion

View File

@ -34,7 +34,6 @@
"@types/cors": "^2.8.12",
"@types/express": "^4.17.13",
"@types/node": "~20.11.16",
"@types/node-fetch": "~2.6.2",
"@types/ws": "^8.5.11",
"@typescript-eslint/eslint-plugin": "^6.11.0",
"@hcengineering/platform-rig": "^0.6.0",
@ -71,12 +70,10 @@
"express": "^4.19.2",
"fast-equals": "^5.0.1",
"file-api": "^0.10.4",
"form-data": "^4.0.0",
"googleapis": "^122.0.0",
"google-auth-library": "^8.0.2",
"jwt-simple": "^0.5.6",
"mongodb": "^6.8.0",
"node-fetch": "^2.6.6",
"ws": "^8.18.0"
}
}

View File

@ -19,7 +19,6 @@ interface Config {
MongoURI: string
MongoDB: string
AccountsURL: string
UploadUrl: string
ServiceID: string
Secret: string
Credentials: string
@ -34,7 +33,6 @@ const envMap: { [key in keyof Config]: string } = {
MongoDB: 'MONGO_DB',
AccountsURL: 'ACCOUNTS_URL',
UploadUrl: 'UPLOAD_URL',
ServiceID: 'SERVICE_ID',
Secret: 'SECRET',
Credentials: 'Credentials',
@ -50,7 +48,6 @@ const config: Config = (() => {
MongoDB: process.env[envMap.MongoDB] ?? 'calendar-service',
MongoURI: process.env[envMap.MongoURI],
AccountsURL: process.env[envMap.AccountsURL],
UploadUrl: process.env[envMap.UploadUrl],
ServiceID: process.env[envMap.ServiceID] ?? 'calendar-service',
Secret: process.env[envMap.Secret],
SystemEmail: process.env[envMap.SystemEmail] ?? 'anticrm@hc.engineering',

View File

@ -35,7 +35,6 @@
"@types/cors": "^2.8.12",
"@types/express": "^4.17.13",
"@types/node": "~20.11.16",
"@types/node-fetch": "~2.6.2",
"@types/ws": "^8.5.11",
"@typescript-eslint/eslint-plugin": "^6.11.0",
"@typescript-eslint/parser": "^6.11.0",
@ -51,7 +50,8 @@
"@types/jest": "^29.5.5",
"prettier": "^3.1.0",
"ts-node-dev": "^2.0.0",
"typescript": "^5.3.3"
"typescript": "^5.3.3",
"@types/uuid": "^8.3.1"
},
"dependencies": {
"@hcengineering/attachment": "^0.6.14",
@ -61,22 +61,23 @@
"@hcengineering/mongo": "^0.6.1",
"@hcengineering/core": "^0.6.32",
"@hcengineering/gmail": "^0.6.22",
"@hcengineering/server-token": "^0.6.11",
"@hcengineering/platform": "^0.6.11",
"@hcengineering/setting": "^0.6.17",
"@hcengineering/server-core": "^0.6.1",
"@hcengineering/server-client": "^0.6.0",
"@hcengineering/server-storage": "^0.6.0",
"@hcengineering/server-token": "^0.6.11",
"cors": "^2.8.5",
"dotenv": "~16.0.0",
"express": "^4.19.2",
"fast-equals": "^5.0.1",
"file-api": "^0.10.4",
"form-data": "^4.0.0",
"googleapis": "^122.0.0",
"google-auth-library": "^8.0.2",
"gaxios": "^5.0.1",
"jwt-simple": "^0.5.6",
"mongodb": "^6.8.0",
"node-fetch": "^2.6.6",
"ws": "^8.18.0"
"ws": "^8.18.0",
"uuid": "^8.3.2"
}
}

View File

@ -20,7 +20,6 @@ interface Config {
MongoURI: string
MongoDB: string
AccountsURL: string
UploadUrl: string
ServiceID: string
Secret: string
Credentials: string
@ -36,7 +35,6 @@ const envMap: { [key in keyof Config]: string } = {
MongoDB: 'MONGO_DB',
AccountsURL: 'ACCOUNTS_URL',
UploadUrl: 'UPLOAD_URL',
ServiceID: 'SERVICE_ID',
Secret: 'SECRET',
Credentials: 'Credentials',
@ -53,7 +51,6 @@ const config: Config = (() => {
MongoDB: process.env[envMap.MongoDB] ?? 'gmail-service',
MongoURI: process.env[envMap.MongoURI],
AccountsURL: process.env[envMap.AccountsURL],
UploadUrl: process.env[envMap.UploadUrl],
ServiceID: process.env[envMap.ServiceID] ?? 'gmail-service',
Secret: process.env[envMap.Secret],
SystemEmail: process.env[envMap.SystemEmail] ?? 'anticrm@hc.engineering',

View File

@ -16,11 +16,11 @@
import attachment, { Attachment } from '@hcengineering/attachment'
import core, {
AttachedData,
Blob,
Client,
Data,
Doc,
MeasureContext,
Ref,
Space,
Timestamp,
TxCollectionCUD,
TxCreateDoc,
@ -28,16 +28,16 @@ import core, {
TxOperations,
TxProcessor,
TxUpdateDoc,
Blob
WorkspaceId
} from '@hcengineering/core'
import gmail, { type Message, type NewMessage } from '@hcengineering/gmail'
import { type StorageAdapter } from '@hcengineering/server-core'
import setting from '@hcengineering/setting'
import FormData from 'form-data'
import type { GaxiosResponse } from 'gaxios'
import type { Credentials, OAuth2Client } from 'google-auth-library'
import { gmail_v1, google } from 'googleapis'
import type { Collection, Db } from 'mongodb'
import fetch from 'node-fetch'
import { v4 as uuid } from 'uuid'
import { arrayBufferToBase64, decode64, encode64 } from './base64'
import config from './config'
import { GmailController } from './gmailController'
@ -51,55 +51,6 @@ const SCOPES = ['https://www.googleapis.com/auth/gmail.modify']
const EMAIL_REGEX =
/(([^<>()[\]\\.,;:\s@"]+(\.[^<>()[\]\\.,;:\s@"]+)*)|(".+"))@((\[[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}])|(([a-zA-Z\-0-9]+\.)+[a-zA-Z]{2,}))/
async function uploadFile (
file: AttachedFile,
token: string,
opts?: { space: Ref<Space>, attachedTo: Ref<Doc> }
): Promise<string> {
const uploadUrl = config.UploadUrl
if (uploadUrl === undefined) {
throw Error('UploadURL is not defined')
}
const data = new FormData()
const buffer = Buffer.from(file.file, 'base64')
data.append(
'file',
Object.assign(buffer, {
lastModified: file.lastModified,
name: file.name,
type: file.type
})
)
const params =
opts !== undefined
? [
['space', opts.space],
['attachedTo', opts.attachedTo]
]
.filter((x): x is [string, Ref<any>] => x[1] !== undefined)
.map(([name, value]) => `${name}=${value}`)
.join('&')
: ''
const suffix = params === '' ? params : `?${params}`
const url = `${uploadUrl}${suffix}`
const resp = await fetch(url, {
method: 'POST',
headers: {
Authorization: 'Bearer ' + token
},
body: data
})
if (resp.status !== 200) {
throw Error(`Failed to upload file: ${resp.statusText}`)
}
return (await resp.text()) as Ref<Blob>
}
function makeHTMLBody (message: NewMessage, from: string): string {
const str = [
'Content-Type: text/html; charset="UTF-8"\n',
@ -207,10 +158,13 @@ export class GmailClient {
private readonly rateLimiter = new RateLimiter(1000, 200) // in fact 250, but let's make reserve
private constructor (
private readonly ctx: MeasureContext,
credentials: ProjectCredentials,
private readonly user: User,
mongo: Db,
client: Client,
private readonly workspaceId: WorkspaceId,
private readonly storageAdapter: StorageAdapter,
private readonly workspace: WorkspaceClient
) {
const { client_secret, client_id, redirect_uris } = credentials.web // eslint-disable-line
@ -222,13 +176,16 @@ export class GmailClient {
}
static async create (
ctx: MeasureContext,
credentials: ProjectCredentials,
user: User | Token,
mongo: Db,
client: Client,
workspace: WorkspaceClient
workspace: WorkspaceClient,
workspaceId: WorkspaceId,
storageAdapter: StorageAdapter
): Promise<GmailClient> {
const gmailClient = new GmailClient(credentials, user, mongo, client, workspace)
const gmailClient = new GmailClient(ctx, credentials, user, mongo, client, workspaceId, storageAdapter, workspace)
if (isToken(user)) {
console.log('Setting token while creating', user.workspace, user.userId, user)
await gmailClient.setToken(user)
@ -665,14 +622,15 @@ export class GmailClient {
if (currentAttachemtns.findIndex((p) => p.name === file.name && p.lastModified === file.lastModified) !== -1) {
return
}
const uuid = await uploadFile(file, this.user.token, { space: message.space, attachedTo: message._id })
const id = uuid()
const data: AttachedData<Attachment> = {
name: file.name,
file: uuid as any,
file: id as Ref<Blob>,
type: file.type ?? 'undefined',
size: file.size ?? Buffer.from(file.file, 'base64').length,
lastModified: file.lastModified
}
await this.storageAdapter.put(this.ctx, this.workspaceId, id, file.file, data.type, data.size)
await this.client.addCollection(
attachment.class.Attachment,
message.space,
@ -858,7 +816,8 @@ export class GmailClient {
}
private async makeAttachmentPart (attachment: Attachment): Promise<string[]> {
const data = await this.getAttachmentData(attachment.file)
const buffer = await this.storageAdapter.read(this.ctx, this.workspaceId, attachment.file)
const data = arrayBufferToBase64(Buffer.concat(buffer))
const res: string[] = []
res.push('--mail\n')
res.push(`Content-Type: ${attachment.type}\n`)
@ -870,16 +829,6 @@ export class GmailClient {
return res
}
private async getAttachmentData (fileId: string): Promise<string> {
const uploadUrl = config.UploadUrl
if (uploadUrl === undefined) {
throw Error('UploadURL is not defined')
}
const url = `${uploadUrl}?file=${fileId}&token=${this.user.token}`
const res = await fetch(url)
return arrayBufferToBase64(await res.arrayBuffer())
}
async close (): Promise<void> {
if (this.watchTimer !== undefined) clearInterval(this.watchTimer)
if (this.refreshTimer !== undefined) clearTimeout(this.refreshTimer)

View File

@ -13,6 +13,9 @@
// limitations under the License.
//
import { MeasureContext } from '@hcengineering/core'
import type { StorageAdapter } from '@hcengineering/server-core'
import { type Db } from 'mongodb'
import { decode64 } from './base64'
import config from './config'
@ -28,17 +31,27 @@ export class GmailController {
protected static _instance: GmailController
private constructor (private readonly mongo: Db) {
private constructor (
private readonly ctx: MeasureContext,
private readonly mongo: Db,
private readonly storageAdapter: StorageAdapter
) {
this.credentials = JSON.parse(config.Credentials)
GmailController._instance = this
}
static getGmailController (mongo?: Db): GmailController {
static create (ctx: MeasureContext, mongo: Db, storageAdapter: StorageAdapter): GmailController {
if (GmailController._instance !== undefined) {
throw new Error('GmailController already exists')
}
return new GmailController(ctx, mongo, storageAdapter)
}
static getGmailController (): GmailController {
if (GmailController._instance !== undefined) {
return GmailController._instance
}
if (mongo === undefined) throw new Error('GmailController not exist')
return new GmailController(mongo)
throw new Error('GmailController not exist')
}
async startAll (): Promise<void> {
@ -108,7 +121,7 @@ export class GmailController {
let res = this.workspaces.get(workspace)
if (res === undefined) {
try {
res = await WorkspaceClient.create(this.credentials, this.mongo, workspace)
res = await WorkspaceClient.create(this.ctx, this.credentials, this.mongo, this.storageAdapter, workspace)
this.workspaces.set(workspace, res)
} catch (err) {
console.error(`Couldn't create workspace worker for ${workspace}, reason: `, err)

View File

@ -14,8 +14,11 @@
// limitations under the License.
//
import { MeasureMetricsContext, newMetrics } from '@hcengineering/core'
import { setMetadata } from '@hcengineering/platform'
import serverClient from '@hcengineering/server-client'
import { type StorageConfiguration } from '@hcengineering/server-core'
import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage'
import serverToken, { decodeToken } from '@hcengineering/server-token'
import { type IncomingHttpHeaders } from 'http'
import { decode64 } from './base64'
@ -34,11 +37,17 @@ const extractToken = (header: IncomingHttpHeaders): any => {
}
export const main = async (): Promise<void> => {
const ctx = new MeasureMetricsContext('gmail', {}, {}, newMetrics())
setMetadata(serverClient.metadata.Endpoint, config.AccountsURL)
setMetadata(serverClient.metadata.UserAgent, config.ServiceID)
setMetadata(serverToken.metadata.Secret, config.Secret)
const storageConfig: StorageConfiguration = storageConfigFromEnv()
const storageAdapter = buildStorageFromConfig(storageConfig, config.MongoURI)
const db = await getDB()
const gmailController = GmailController.getGmailController(db)
const gmailController = GmailController.create(ctx, db, storageAdapter)
await gmailController.startAll()
const endpoints: Endpoint[] = [
{
@ -114,14 +123,15 @@ export const main = async (): Promise<void> => {
const server = listen(createServer(endpoints), config.Port)
const asyncClose = async (): Promise<void> => {
await gmailController.close()
await storageAdapter.close()
await closeDB()
}
const shutdown = (): void => {
server.close(() => {
void gmailController
.close()
.then(async () => {
await closeDB()
})
.then(() => process.exit())
void asyncClose().then(() => process.exit())
})
}

View File

@ -25,9 +25,11 @@ import core, {
type TxCreateDoc,
TxProcessor,
type TxRemoveDoc,
type TxUpdateDoc
type TxUpdateDoc,
MeasureContext
} from '@hcengineering/core'
import gmailP, { type NewMessage } from '@hcengineering/gmail'
import type { StorageAdapter } from '@hcengineering/server-core'
import { generateToken } from '@hcengineering/server-token'
import { type Db } from 'mongodb'
import { getClient } from './client'
@ -45,13 +47,21 @@ export class WorkspaceClient {
private readonly clients: Map<Ref<Account>, GmailClient> = new Map<Ref<Account>, GmailClient>()
private constructor (
private readonly ctx: MeasureContext,
private readonly credentials: ProjectCredentials,
private readonly mongo: Db,
private readonly storageAdapter: StorageAdapter,
private readonly workspace: string
) {}
static async create (credentials: ProjectCredentials, mongo: Db, workspace: string): Promise<WorkspaceClient> {
const instance = new WorkspaceClient(credentials, mongo, workspace)
static async create (
ctx: MeasureContext,
credentials: ProjectCredentials,
mongo: Db,
storageAdapter: StorageAdapter,
workspace: string
): Promise<WorkspaceClient> {
const instance = new WorkspaceClient(ctx, credentials, mongo, storageAdapter, workspace)
await instance.initClient(workspace)
return instance
}
@ -59,7 +69,16 @@ export class WorkspaceClient {
async createGmailClient (user: User): Promise<GmailClient> {
const current = this.getGmailClient(user.userId)
if (current !== undefined) return current
const newClient = await GmailClient.create(this.credentials, user, this.mongo, this.client, this)
const newClient = await GmailClient.create(
this.ctx,
this.credentials,
user,
this.mongo,
this.client,
this,
{ name: this.workspace, productId: '' },
this.storageAdapter
)
this.clients.set(user.userId, newClient)
return newClient
}

View File

@ -36,7 +36,6 @@
"@types/express": "^4.17.13",
"@types/mime": "^3.0.1",
"@types/node": "~20.11.16",
"@types/node-fetch": "~2.6.2",
"@types/ws": "^8.5.11",
"esbuild": "^0.20.0",
"prettier": "^3.1.0",
@ -53,7 +52,8 @@
"eslint-plugin-import": "^2.26.0",
"eslint-plugin-n": "^15.4.0",
"eslint-plugin-promise": "^6.1.1",
"eslint": "^8.54.0"
"eslint": "^8.54.0",
"@types/uuid": "^8.3.1"
},
"dependencies": {
"@hcengineering/attachment": "^0.6.14",
@ -64,20 +64,21 @@
"@hcengineering/platform": "^0.6.11",
"@hcengineering/setting": "^0.6.17",
"@hcengineering/telegram": "^0.6.21",
"@hcengineering/server-token": "^0.6.11",
"@hcengineering/server-core": "^0.6.1",
"@hcengineering/server-client": "^0.6.0",
"@hcengineering/server-storage": "^0.6.0",
"@hcengineering/server-token": "^0.6.11",
"@hcengineering/mongo": "^0.6.1",
"big-integer": "^1.6.51",
"dotenv": "~16.0.0",
"cors": "^2.8.5",
"express": "^4.19.2",
"form-data": "^4.0.0",
"htmlparser2": "^9.0.0",
"jwt-simple": "^0.5.6",
"mime": "^3.0.0",
"mongodb": "^6.8.0",
"node-fetch": "^2.6.6",
"telegram": "2.22.2",
"ws": "^8.18.0"
"ws": "^8.18.0",
"uuid": "^8.3.2"
}
}

View File

@ -9,7 +9,6 @@ interface Config {
MongoURI: string
MongoDB: string
UploadUrl: string
AccountsURL: string
ServiceID: string
Secret: string
@ -27,7 +26,6 @@ const envMap: { [key in keyof Config]: string } = {
MongoURI: 'MONGO_URI',
MongoDB: 'MONGO_DB',
UploadUrl: 'UPLOAD_URL',
AccountsURL: 'ACCOUNTS_URL',
ServiceID: 'SERVICE_ID',
Secret: 'SECRET',
@ -52,14 +50,7 @@ const defaults: Partial<Config> = {
Secret: undefined
}
const required: Array<keyof Config> = [
'TelegramApiID',
'TelegramApiHash',
'UploadUrl',
'MongoURI',
'AccountsURL',
'Secret'
]
const required: Array<keyof Config> = ['TelegramApiID', 'TelegramApiHash', 'MongoURI', 'AccountsURL', 'Secret']
const mergeConfigs = <T>(defaults: Partial<T>, params: Partial<T>): T => {
const result = { ...defaults }
@ -81,7 +72,6 @@ const config = (() => {
TelegramApiID: parseNumber(process.env[envMap.TelegramApiID]),
TelegramApiHash: process.env[envMap.TelegramApiHash],
TelegramAuthTTL: ttl === undefined ? ttl : ttl * 1000,
UploadUrl: process.env[envMap.UploadUrl],
MongoDB: process.env[envMap.MongoDB],
MongoURI: process.env[envMap.MongoURI],
AccountsURL: process.env[envMap.AccountsURL],

View File

@ -3,8 +3,11 @@ import { PlatformWorker } from './platform'
import { createServer, Handler, listen } from './server'
import { telegram } from './telegram'
import { MeasureMetricsContext, newMetrics } from '@hcengineering/core'
import { setMetadata } from '@hcengineering/platform'
import serverClient from '@hcengineering/server-client'
import { type StorageConfiguration } from '@hcengineering/server-core'
import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage'
import serverToken, { decodeToken, type Token } from '@hcengineering/server-token'
import config from './config'
@ -17,11 +20,16 @@ const extractToken = (header: IncomingHttpHeaders): Token | undefined => {
}
export const main = async (): Promise<void> => {
const ctx = new MeasureMetricsContext('telegram', {}, {}, newMetrics())
setMetadata(serverClient.metadata.Endpoint, config.AccountsURL)
setMetadata(serverClient.metadata.UserAgent, config.ServiceID)
setMetadata(serverToken.metadata.Secret, config.Secret)
const platformWorker = await PlatformWorker.create()
const storageConfig: StorageConfiguration = storageConfigFromEnv()
const storageAdapter = buildStorageFromConfig(storageConfig, config.MongoURI)
const platformWorker = await PlatformWorker.create(ctx, storageAdapter)
const endpoints: Array<[string, Handler]> = [
[
'/signin',
@ -187,10 +195,15 @@ export const main = async (): Promise<void> => {
const server = listen(createServer(endpoints), config.Port, config.Host)
const asyncClose = async (): Promise<void> => {
await platformWorker.close()
await storageAdapter.close()
}
const shutdown = (): void => {
server.close()
void Promise.all([platformWorker.close()]).then(() => process.exit())
void asyncClose().then(() => process.exit())
}
process.on('SIGINT', shutdown)

View File

@ -2,9 +2,13 @@ import type { Collection } from 'mongodb'
import { getDB } from './storage'
import { LastMsgRecord, TgUser, User, UserRecord, WorkspaceChannel } from './types'
import { WorkspaceWorker } from './workspace'
import { StorageAdapter } from '@hcengineering/server-core'
import { MeasureContext } from '@hcengineering/core'
export class PlatformWorker {
private constructor (
private readonly ctx: MeasureContext,
private readonly storageAdapter: StorageAdapter,
private readonly clientMap: Map<string, WorkspaceWorker>,
private readonly storage: Collection<UserRecord>
) {}
@ -29,7 +33,14 @@ export class PlatformWorker {
if (wsWorker === undefined) {
const [userStorage, lastMsgStorage, channelStorage] = await PlatformWorker.createStorages()
wsWorker = await WorkspaceWorker.create(workspace, userStorage, lastMsgStorage, channelStorage)
wsWorker = await WorkspaceWorker.create(
this.ctx,
this.storageAdapter,
workspace,
userStorage,
lastMsgStorage,
channelStorage
)
this.clientMap.set(workspace, wsWorker)
}
@ -80,13 +91,20 @@ export class PlatformWorker {
return [userStorage, lastMsgStorage, channelStorage]
}
static async create (): Promise<PlatformWorker> {
static async create (ctx: MeasureContext, storageAdapter: StorageAdapter): Promise<PlatformWorker> {
const [userStorage, lastMsgStorage, channelStorage] = await PlatformWorker.createStorages()
const workspaces = new Set((await userStorage.find().toArray()).map((p) => p.workspace))
const clients: Array<[string, WorkspaceWorker]> = []
for (const workspace of workspaces) {
try {
const worker = await WorkspaceWorker.create(workspace, userStorage, lastMsgStorage, channelStorage)
const worker = await WorkspaceWorker.create(
ctx,
storageAdapter,
workspace,
userStorage,
lastMsgStorage,
channelStorage
)
clients.push([workspace, worker])
void worker.checkUsers()
} catch (e) {
@ -97,7 +115,7 @@ export class PlatformWorker {
const res = clients.filter((client): client is [string, WorkspaceWorker] => client !== undefined)
const worker = new PlatformWorker(new Map(res), userStorage)
const worker = new PlatformWorker(ctx, storageAdapter, new Map(res), userStorage)
return worker
}

View File

@ -1,10 +1,8 @@
import client, { ClientSocket } from '@hcengineering/client'
import { Client, Doc, Ref, Space } from '@hcengineering/core'
import { Client } from '@hcengineering/core'
import { setMetadata } from '@hcengineering/platform'
import { createClient, getTransactorEndpoint } from '@hcengineering/server-client'
import FormData from 'form-data'
import mime from 'mime'
import fetch from 'node-fetch'
import { Api } from 'telegram'
import WebSocket from 'ws'
import config from './config'
@ -58,55 +56,6 @@ export async function getFiles (msg: Api.Message): Promise<AttachedFile[]> {
return [obj]
}
export async function uploadFile (
file: AttachedFile,
token: string,
opts?: { space: Ref<Space>, attachedTo: Ref<Doc> }
): Promise<string> {
const uploadUrl = config.UploadUrl
if (uploadUrl === undefined) {
throw Error('UploadURL is not defined')
}
const data = new FormData()
data.append(
'file',
Object.assign(file.file, {
lastModified: file.lastModified,
size: file.size,
name: file.name,
type: file.type
})
)
const params =
opts !== undefined
? [
['space', opts.space],
['attachedTo', opts.attachedTo]
]
.filter((x): x is [string, Ref<any>] => x[1] !== undefined)
.map(([name, value]) => `${name}=${value}`)
.join('&')
: ''
const suffix = params === '' ? params : `?${params}`
const url = `${uploadUrl}${suffix}`
const resp = await fetch(url, {
method: 'POST',
headers: {
Authorization: 'Bearer ' + token
},
body: data
})
if (resp.status !== 200) {
throw Error(`Failed to upload file: ${resp.statusText}`)
}
return await resp.text()
}
export async function createPlatformClient (token: string): Promise<Client> {
setMetadata(client.metadata.ClientSocketFactory, (url) => {
return new WebSocket(url, {

View File

@ -14,6 +14,7 @@ import core, {
Client,
Doc,
Hierarchy,
MeasureContext,
Ref,
Tx,
TxCollectionCUD,
@ -24,19 +25,20 @@ import core, {
TxRemoveDoc,
TxUpdateDoc
} from '@hcengineering/core'
import type { StorageAdapter } from '@hcengineering/server-core'
import { generateToken } from '@hcengineering/server-token'
import settingP from '@hcengineering/setting'
import telegramP, { NewTelegramMessage } from '@hcengineering/telegram'
import type { Collection } from 'mongodb'
import fetch from 'node-fetch'
import { Api } from 'telegram'
import { v4 as uuid } from 'uuid'
import config from './config'
import { platformToTelegram, telegramToPlatform } from './markup'
import { MsgQueue } from './queue'
import type { TelegramConnectionInterface } from './telegram'
import { telegram } from './telegram'
import { Event, LastMsgRecord, TelegramMessage, TgUser, UserRecord, WorkspaceChannel } from './types'
import { createPlatformClient, getFiles, normalizeValue, uploadFile } from './utils'
import { createPlatformClient, getFiles, normalizeValue } from './utils'
export class WorkspaceWorker {
private readonly clients = new Map<
@ -52,8 +54,9 @@ export class WorkspaceWorker {
private readonly hierarchy: Hierarchy
private constructor (
private readonly ctx: MeasureContext,
private readonly client: Client,
private readonly token: string,
private readonly storageAdapter: StorageAdapter,
private readonly workspace: string,
private readonly userStorage: Collection<UserRecord>,
private readonly lastMsgStorage: Collection<LastMsgRecord>,
@ -150,6 +153,8 @@ export class WorkspaceWorker {
}
static async create (
ctx: MeasureContext,
storageAdapter: StorageAdapter,
workspace: string,
userStorage: Collection<UserRecord>,
lastMsgStorage: Collection<LastMsgRecord>,
@ -158,7 +163,15 @@ export class WorkspaceWorker {
const token = generateToken(config.SystemEmail, { name: workspace, productId: '' })
const client = await createPlatformClient(token)
const worker = new WorkspaceWorker(client, token, workspace, userStorage, lastMsgStorage, channelsStorage)
const worker = new WorkspaceWorker(
ctx,
client,
storageAdapter,
workspace,
userStorage,
lastMsgStorage,
channelsStorage
)
await worker.init()
return worker
}
@ -631,7 +644,8 @@ export class WorkspaceWorker {
const attachments = await this.client.findAll(attachment.class.Attachment, { attachedTo: msg._id })
const res: Buffer[] = []
for (const attachment of attachments) {
const buffer = await this.getAttachmentData(attachment.file)
const chunks = await this.storageAdapter.read(this.ctx, { name: this.workspace, productId: '' }, attachment.file)
const buffer = Buffer.concat(chunks)
if (buffer.length > 0) {
res.push(
Object.assign(buffer, {
@ -653,8 +667,16 @@ export class WorkspaceWorker {
const files = await getFiles(event.msg)
for (const file of files) {
try {
const id = uuid()
file.size = file.size ?? file.file.length
const uuid = await uploadFile(file, this.token, { space: msg.space, attachedTo: msg._id })
await this.storageAdapter.put(
this.ctx,
{ name: this.workspace, productId: '' },
id,
file.file,
file.type,
file.size
)
const tx = factory.createTxCollectionCUD<TelegramMessage, Attachment>(
msg._class,
msg._id,
@ -662,7 +684,7 @@ export class WorkspaceWorker {
'attachments',
factory.createTxCreateDoc<Attachment>(attachment.class.Attachment, msg.space, {
name: file.name,
file: uuid as Ref<Blob>,
file: id as Ref<Blob>,
type: file.type,
size: file.size,
lastModified: file.lastModified,
@ -681,17 +703,6 @@ export class WorkspaceWorker {
}
}
private async getAttachmentData (fileId: string): Promise<Buffer> {
const uploadUrl = config.UploadUrl
if (uploadUrl === undefined) {
throw Error('UploadURL is not defined')
}
const url = `${uploadUrl}?file=${fileId}&token=${this.token}`
const res = await fetch(url)
const buffer = await res.buffer()
return buffer
}
// #endregion
// #endregion