feat: datalake worker initial version (#6952)

Signed-off-by: Alexander Onnikov <Alexander.Onnikov@xored.com>
This commit is contained in:
Alexander Onnikov 2024-10-18 19:17:40 +07:00 committed by GitHub
parent 8a07a4581f
commit 913849af82
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
49 changed files with 2007 additions and 289 deletions

View File

@ -155,6 +155,9 @@ dependencies:
'@rush-temp/cloud-branding':
specifier: file:./projects/cloud-branding.tgz
version: file:projects/cloud-branding.tgz(@types/node@20.11.19)(bufferutil@4.0.8)(esbuild@0.20.1)(ts-node@10.9.2)(utf-8-validate@6.0.4)
'@rush-temp/cloud-datalake':
specifier: file:./projects/cloud-datalake.tgz
version: file:projects/cloud-datalake.tgz(@types/node@20.11.19)(bufferutil@4.0.8)(esbuild@0.20.1)(ts-node@10.9.2)(utf-8-validate@6.0.4)
'@rush-temp/collaboration':
specifier: file:./projects/collaboration.tgz
version: file:projects/collaboration.tgz(esbuild@0.20.1)(ts-node@10.9.2)
@ -1394,6 +1397,9 @@ dependencies:
aws-sdk:
specifier: ^2.1423.0
version: 2.1664.0
aws4fetch:
specifier: ^1.0.20
version: 1.0.20
base64-js:
specifier: ^1.5.1
version: 1.5.1
@ -1748,6 +1754,9 @@ dependencies:
postcss-loader:
specifier: ^7.0.2
version: 7.3.4(postcss@8.4.35)(typescript@5.3.3)(webpack@5.90.3)
postgres:
specifier: ^3.4.4
version: 3.4.4
posthog-js:
specifier: ~1.122.0
version: 1.122.0
@ -11374,6 +11383,10 @@ packages:
xml2js: 0.6.2
dev: false
/aws4fetch@1.0.20:
resolution: {integrity: sha512-/djoAN709iY65ETD6LKCtyyEI04XIBP5xVvfmNxsEP0uJB5tyaGBztSryRr4HqMStr9R06PisQE7m9zDTXKu6g==}
dev: false
/axobject-query@4.0.0:
resolution: {integrity: sha512-+60uv1hiVFhHZeO+Lz0RYzsVHy5Wr1ayX0mwda9KPDVLNJgZ1T9Ny7VmFbLDzxsH0D87I86vgj3gFrjTJUYznw==}
dependencies:
@ -11548,6 +11561,13 @@ packages:
dev: false
optional: true
/base32-encode@2.0.0:
resolution: {integrity: sha512-mlmkfc2WqdDtMl/id4qm3A7RjW6jxcbAoMjdRmsPiwQP0ufD4oXItYMnPgVHe80lnAIy+1xwzhHE1s4FoIceSw==}
engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0}
dependencies:
to-data-view: 2.0.0
dev: false
/base64-js@1.5.1:
resolution: {integrity: sha512-AKpaYlHn8t4SVbOHCy+b5+KKgvR4vrsD8vbvrbiQJps7fKDTkjkDry6ji0rUJjC0kzbNePLwzxq8iypo41qeWA==}
dev: false
@ -20790,6 +20810,11 @@ packages:
resolution: {integrity: sha512-i/hbxIE9803Alj/6ytL7UHQxRvZkI9O4Sy+J3HGc4F4oo/2eQAjTSNJ0bfxyse3bH0nuVesCk+3IRLaMtG3H6w==}
dev: false
/postgres@3.4.4:
resolution: {integrity: sha512-IbyN+9KslkqcXa8AO9fxpk97PA4pzewvpi2B3Dwy9u4zpV32QicaEdgmF3eSQUzdRk7ttDHQejNgAEr4XoeH4A==}
engines: {node: '>=12'}
dev: false
/posthog-js@1.122.0:
resolution: {integrity: sha512-+8R2/nLaWyI5Jp2Ly7L52qcgDFU3xryyoNG52DPJ8dlGnagphxIc0mLNGurgyKeeTGycsOsuOIP4dtofv3ZoBA==}
deprecated: This version of posthog-js is deprecated, please update posthog-js, and do not use this version! Check out our JS docs at https://posthog.com/docs/libraries/js
@ -23505,6 +23530,11 @@ packages:
resolution: {integrity: sha512-3f0uOEAQwIqGuWW2MVzYg8fV/QNnc/IpuJNG837rLuczAaLVHslWHZQj4IGiEl5Hs3kkbhwL9Ab7Hrsmuj+Smw==}
dev: false
/to-data-view@2.0.0:
resolution: {integrity: sha512-RGEM5KqlPHr+WVTPmGNAXNeFEmsBnlkxXaIfEpUYV0AST2Z5W1EGq9L/MENFrMMmL2WQr1wjkmZy/M92eKhjYA==}
engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0}
dev: false
/to-fast-properties@2.0.0:
resolution: {integrity: sha512-/OaKK0xYrs3DmxRYqL/yDc+FxFUVYhDlXMhRmv3z915w2HF1tnN1omB354j8VUGO/hbRzyD6Y3sA7v7GS/ceog==}
engines: {node: '>=4'}
@ -26627,6 +26657,44 @@ packages:
- utf-8-validate
dev: false
file:projects/cloud-datalake.tgz(@types/node@20.11.19)(bufferutil@4.0.8)(esbuild@0.20.1)(ts-node@10.9.2)(utf-8-validate@6.0.4):
resolution: {integrity: sha512-AA2lTsmPKPeYA1MTwIscZFRO40m9Ctc59Er2x8VRLNBBt4mQ01b1CCay4VFVPWYxAzh+Ru9RoUIB7lS+m8sj9Q==, tarball: file:projects/cloud-datalake.tgz}
id: file:projects/cloud-datalake.tgz
name: '@rush-temp/cloud-datalake'
version: 0.0.0
dependencies:
'@cloudflare/workers-types': 4.20241004.0
'@types/jest': 29.5.12
'@typescript-eslint/eslint-plugin': 6.21.0(@typescript-eslint/parser@6.21.0)(eslint@8.56.0)(typescript@5.6.2)
'@typescript-eslint/parser': 6.21.0(eslint@8.56.0)(typescript@5.6.2)
aws4fetch: 1.0.20
base32-encode: 2.0.0
eslint: 8.56.0
eslint-config-standard-with-typescript: 40.0.0(@typescript-eslint/eslint-plugin@6.21.0)(eslint-plugin-import@2.29.1)(eslint-plugin-n@15.7.0)(eslint-plugin-promise@6.1.1)(eslint@8.56.0)(typescript@5.6.2)
eslint-plugin-import: 2.29.1(eslint@8.56.0)
eslint-plugin-n: 15.7.0(eslint@8.56.0)
eslint-plugin-promise: 6.1.1(eslint@8.56.0)
itty-router: 5.0.18
jest: 29.7.0(@types/node@20.11.19)(ts-node@10.9.2)
postgres: 3.4.4
prettier: 3.2.5
ts-jest: 29.1.2(esbuild@0.20.1)(jest@29.7.0)(typescript@5.6.2)
typescript: 5.6.2
wrangler: 3.80.2(@cloudflare/workers-types@4.20241004.0)(bufferutil@4.0.8)(utf-8-validate@6.0.4)
transitivePeerDependencies:
- '@babel/core'
- '@jest/types'
- '@types/node'
- babel-jest
- babel-plugin-macros
- bufferutil
- esbuild
- node-notifier
- supports-color
- ts-node
- utf-8-validate
dev: false
file:projects/collaboration.tgz(esbuild@0.20.1)(ts-node@10.9.2):
resolution: {integrity: sha512-krhgq1XiDnWKIP/HUM8VQgEzXdxLNfDf68lZgDl/Yl2tEFUu8yLYpzd1qWVMkl8N0dXyGts+DEFC7Ntns48lgA==, tarball: file:projects/collaboration.tgz}
id: file:projects/collaboration.tgz
@ -27018,6 +27086,7 @@ packages:
name: '@rush-temp/datalake'
version: 0.0.0
dependencies:
'@aws-sdk/client-s3': 3.577.0
'@types/jest': 29.5.12
'@types/node': 20.11.19
'@types/node-fetch': 2.6.11

View File

@ -96,7 +96,7 @@ import '@hcengineering/analytics-collector-assets'
import '@hcengineering/text-editor-assets'
import { coreId } from '@hcengineering/core'
import presentation, { parsePreviewConfig, presentationId } from '@hcengineering/presentation'
import presentation, { parsePreviewConfig, parseUploadConfig, presentationId } from '@hcengineering/presentation'
import textEditor, { textEditorId } from '@hcengineering/text-editor'
import love, { loveId } from '@hcengineering/love'
import print, { printId } from '@hcengineering/print'
@ -205,6 +205,7 @@ export async function configurePlatform (): Promise<void> {
setMetadata(presentation.metadata.FilesURL, config.FILES_URL)
setMetadata(presentation.metadata.CollaboratorUrl, config.COLLABORATOR_URL)
setMetadata(presentation.metadata.PreviewConfig, parsePreviewConfig(config.PREVIEW_CONFIG))
setMetadata(presentation.metadata.UploadConfig, parseUploadConfig(config.UPLOAD_CONFIG, config.UPLOAD_URL))
setMetadata(presentation.metadata.FrontUrl, config.FRONT_URL)
setMetadata(textEditor.metadata.Collaborator, config.COLLABORATOR ?? '')

View File

@ -30,6 +30,7 @@ export interface Config {
AI_URL?:string
BRANDING_URL?: string
PREVIEW_CONFIG: string
UPLOAD_CONFIG: string
DESKTOP_UPDATES_URL?: string
DESKTOP_UPDATES_CHANNEL?: string
TELEGRAM_BOT_URL?: string

View File

@ -108,7 +108,12 @@ import github, { githubId } from '@hcengineering/github'
import '@hcengineering/github-assets'
import { coreId } from '@hcengineering/core'
import presentation, { loadServerConfig, parsePreviewConfig, presentationId } from '@hcengineering/presentation'
import presentation, {
loadServerConfig,
parsePreviewConfig,
parseUploadConfig,
presentationId
} from '@hcengineering/presentation'
import { setMetadata } from '@hcengineering/platform'
import { setDefaultLanguage, initThemeStore } from '@hcengineering/theme'
@ -150,6 +155,7 @@ export interface Config {
// Could be defined for dev environment
FRONT_URL?: string
PREVIEW_CONFIG?: string
UPLOAD_CONFIG?: string
}
export interface Branding {
@ -292,6 +298,7 @@ export async function configurePlatform() {
setMetadata(presentation.metadata.FrontUrl, config.FRONT_URL)
setMetadata(presentation.metadata.PreviewConfig, parsePreviewConfig(config.PREVIEW_CONFIG))
setMetadata(presentation.metadata.UploadConfig, parseUploadConfig(config.UPLOAD_CONFIG, config.UPLOAD_URL))
setMetadata(textEditor.metadata.Collaborator, config.COLLABORATOR)

View File

@ -13,12 +13,30 @@
// limitations under the License.
//
import { concatLink, type Blob, type Ref } from '@hcengineering/core'
import { concatLink, type Blob as PlatformBlob, type Ref } from '@hcengineering/core'
import { PlatformError, Severity, Status, getMetadata } from '@hcengineering/platform'
import { v4 as uuid } from 'uuid'
import plugin from './plugin'
export type FileUploadMethod = 'form-data' | 'signed-url'
export interface UploadConfig {
'form-data': {
url: string
}
'signed-url'?: {
url: string
size: number
}
}
export interface FileUploadParams {
method: FileUploadMethod
url: string
headers: Record<string, string>
}
interface FileUploadError {
key: string
error: string
@ -34,6 +52,46 @@ type FileUploadResult = FileUploadSuccess | FileUploadError
const defaultUploadUrl = '/files'
const defaultFilesUrl = '/files/:workspace/:filename?file=:blobId&workspace=:workspace'
function parseInt (value: string, fallback: number): number {
const number = Number.parseInt(value)
return Number.isInteger(number) ? number : fallback
}
export function parseUploadConfig (config: string, uploadUrl: string): UploadConfig {
const uploadConfig: UploadConfig = {
'form-data': { url: uploadUrl },
'signed-url': undefined
}
if (config !== undefined) {
const configs = config.split(';')
for (const c of configs) {
if (c === '') {
continue
}
const [key, size, url] = c.split('|')
if (url === undefined || url === '') {
throw new Error(`Bad upload config: ${c}`)
}
if (key === 'form-data') {
uploadConfig['form-data'] = { url }
} else if (key === 'signed-url') {
uploadConfig['signed-url'] = {
url,
size: parseInt(size, 0) * 1024 * 1024
}
} else {
throw new Error(`Unknown upload config key: ${key}`)
}
}
}
return uploadConfig
}
function getFilesUrl (): string {
const filesUrl = getMetadata(plugin.metadata.FilesURL) ?? defaultFilesUrl
const frontUrl = getMetadata(plugin.metadata.FrontUrl) ?? window.location.origin
@ -61,6 +119,42 @@ export function getUploadUrl (): string {
return template.replaceAll(':workspace', encodeURIComponent(getCurrentWorkspaceId()))
}
function getUploadConfig (): UploadConfig {
return getMetadata<UploadConfig>(plugin.metadata.UploadConfig) ?? { 'form-data': { url: getUploadUrl() } }
}
function getFileUploadMethod (blob: Blob): { method: FileUploadMethod, url: string } {
const config = getUploadConfig()
const signedUrl = config['signed-url']
if (signedUrl !== undefined && signedUrl.size < blob.size) {
return { method: 'signed-url', url: signedUrl.url }
}
return { method: 'form-data', url: config['form-data'].url }
}
/**
* @public
*/
export function getFileUploadParams (blobId: string, blob: Blob): FileUploadParams {
const workspaceId = encodeURIComponent(getCurrentWorkspaceId())
const fileId = encodeURIComponent(blobId)
const { method, url: urlTemplate } = getFileUploadMethod(blob)
const url = urlTemplate.replaceAll(':workspace', workspaceId).replaceAll(':blobId', fileId)
const headers: Record<string, string> =
method !== 'signed-url'
? {
Authorization: 'Bearer ' + (getMetadata(plugin.metadata.Token) as string)
}
: {}
return { method, url, headers }
}
/**
* @public
*/
@ -79,12 +173,40 @@ export function getFileUrl (file: string, filename?: string): string {
/**
* @public
*/
export async function uploadFile (file: File): Promise<Ref<Blob>> {
const uploadUrl = getUploadUrl()
export async function uploadFile (file: File): Promise<Ref<PlatformBlob>> {
const id = generateFileId()
const params = getFileUploadParams(id, file)
if (params.method === 'signed-url') {
await uploadFileWithSignedUrl(file, id, params.url)
} else {
await uploadFileWithFormData(file, id, params.url)
}
return id as Ref<PlatformBlob>
}
/**
* @public
*/
export async function deleteFile (id: string): Promise<void> {
const fileUrl = getFileUrl(id)
const resp = await fetch(fileUrl, {
method: 'DELETE',
headers: {
Authorization: 'Bearer ' + (getMetadata(plugin.metadata.Token) as string)
}
})
if (resp.status !== 200) {
throw new Error('Failed to delete file')
}
}
async function uploadFileWithFormData (file: File, uuid: string, uploadUrl: string): Promise<void> {
const data = new FormData()
data.append('file', file, id)
data.append('file', file, uuid)
const resp = await fetch(uploadUrl, {
method: 'POST',
@ -110,24 +232,54 @@ export async function uploadFile (file: File): Promise<Ref<Blob>> {
if ('error' in result[0]) {
throw Error(`Failed to upload file: ${result[0].error}`)
}
return id as Ref<Blob>
}
/**
* @public
*/
export async function deleteFile (id: string): Promise<void> {
const fileUrl = getFileUrl(id)
const resp = await fetch(fileUrl, {
method: 'DELETE',
async function uploadFileWithSignedUrl (file: File, uuid: string, uploadUrl: string): Promise<void> {
const response = await fetch(uploadUrl, {
method: 'POST',
headers: {
Authorization: 'Bearer ' + (getMetadata(plugin.metadata.Token) as string)
}
})
if (resp.status !== 200) {
throw new Error('Failed to delete file')
if (response.ok) {
throw Error(`Failed to genearte signed upload URL: ${response.statusText}`)
}
const signedUrl = await response.text()
if (signedUrl === undefined || signedUrl === '') {
throw Error('Missing signed upload URL')
}
try {
const response = await fetch(signedUrl, {
body: file,
method: 'PUT',
headers: {
'Content-Type': file.type,
'Content-Length': file.size.toString(),
'x-amz-meta-last-modified': file.lastModified.toString()
}
})
if (!response.ok) {
throw Error(`Failed to upload file: ${response.statusText}`)
}
// confirm we uploaded file
await fetch(uploadUrl, {
method: 'PUT',
headers: {
Authorization: 'Bearer ' + (getMetadata(plugin.metadata.Token) as string)
}
})
} catch (err) {
// abort the upload
await fetch(uploadUrl, {
method: 'DELETE',
headers: {
Authorization: 'Bearer ' + (getMetadata(plugin.metadata.Token) as string)
}
})
}
}

View File

@ -43,6 +43,7 @@ import {
type InstantTransactions,
type ObjectSearchCategory
} from './types'
import { type UploadConfig } from './file'
/**
* @public
@ -138,6 +139,7 @@ export default plugin(presentationId, {
Workspace: '' as Metadata<string>,
WorkspaceId: '' as Metadata<string>,
FrontUrl: '' as Asset,
UploadConfig: '' as Metadata<UploadConfig>,
PreviewConfig: '' as Metadata<PreviewConfig | undefined>,
ClientHook: '' as Metadata<ClientHook>,
SessionId: '' as Metadata<string>

View File

@ -35,7 +35,7 @@
getModelRefActions
} from '@hcengineering/text-editor-resources'
import { AnySvelteComponent, getEventPositionElement, getPopupPositionElement, navigate } from '@hcengineering/ui'
import { uploadFiles } from '@hcengineering/uploader'
import { type FileUploadCallbackParams, uploadFiles } from '@hcengineering/uploader'
import view from '@hcengineering/view'
import { getCollaborationUser, getObjectId, getObjectLinkFragment } from '@hcengineering/view-resources'
import { Analytics } from '@hcengineering/analytics'
@ -135,14 +135,7 @@
progress = true
await uploadFiles(
list,
{ objectId: object._id, objectClass: object._class },
{},
async (uuid, name, file, path, metadata) => {
await createAttachment(uuid, name, file, metadata)
}
)
await uploadFiles(list, { onFileUploaded })
inputFile.value = ''
progress = false
@ -151,14 +144,7 @@
async function attachFiles (files: File[] | FileList): Promise<void> {
progress = true
if (files.length > 0) {
await uploadFiles(
files,
{ objectId: object._id, objectClass: object._class },
{},
async (uuid, name, file, path, metadata) => {
await createAttachment(uuid, name, file, metadata)
}
)
await uploadFiles(files, { onFileUploaded })
}
progress = false
}
@ -174,6 +160,10 @@
}
}
async function onFileUploaded ({ uuid, name, file, metadata }: FileUploadCallbackParams): Promise<void> {
await createAttachment(uuid, name, file, metadata)
}
async function createAttachment (
uuid: Ref<Blob>,
name: string,

View File

@ -13,7 +13,7 @@
// limitations under the License.
-->
<script lang="ts">
import { Attachment } from '@hcengineering/attachment'
import { Attachment, BlobMetadata } from '@hcengineering/attachment'
import {
Account,
Class,
@ -32,6 +32,7 @@
deleteFile,
DraftController,
draftsStore,
FileOrBlob,
getClient,
getFileMetadata,
uploadFile
@ -40,6 +41,7 @@
import textEditor, { type RefAction } from '@hcengineering/text-editor'
import { AttachIcon, StyledTextBox } from '@hcengineering/text-editor-resources'
import { ButtonSize } from '@hcengineering/ui'
import { type FileUploadCallbackParams, uploadFiles } from '@hcengineering/uploader'
import { createEventDispatcher, onDestroy } from 'svelte'
import attachment from '../plugin'
@ -150,11 +152,29 @@
}
}
async function createAttachment (file: File): Promise<{ file: Ref<Blob>, type: string } | undefined> {
if (space === undefined || objectId === undefined || _class === undefined) return
async function attachFile (file: File): Promise<{ file: Ref<Blob>, type: string } | undefined> {
try {
const uuid = await uploadFile(file)
const metadata = await getFileMetadata(file, uuid)
await createAttachment(uuid, file.name, file, metadata)
return { file: uuid, type: file.type }
} catch (err: any) {
await setPlatformStatus(unknownError(err))
}
}
async function onFileUploaded ({ uuid, name, file, metadata }: FileUploadCallbackParams): Promise<void> {
await createAttachment(uuid, name, file, metadata)
}
async function createAttachment (
uuid: Ref<Blob>,
name: string,
file: FileOrBlob,
metadata: BlobMetadata | undefined
): Promise<void> {
if (space === undefined || objectId === undefined || _class === undefined) return
try {
const _id: Ref<Attachment> = generateId()
attachments.set(_id, {
@ -166,13 +186,14 @@
space,
attachedTo: objectId,
attachedToClass: _class,
name: file.name,
name,
file: uuid,
type: file.type,
size: file.size,
lastModified: file.lastModified,
lastModified: file instanceof File ? file.lastModified : Date.now(),
metadata
})
newAttachments.add(_id)
attachments = attachments
saved = false
@ -183,7 +204,6 @@
if (useDirectAttachDelete) {
saveNewAttachment(_id)
}
return { file: uuid, type: file.type }
} catch (err: any) {
setPlatformStatus(unknownError(err))
}
@ -207,12 +227,7 @@
progress = true
const list = inputFile.files
if (list === null || list.length === 0) return
for (let index = 0; index < list.length; index++) {
const file = list.item(index)
if (file !== null) {
await createAttachment(file)
}
}
await uploadFiles(list, { onFileUploaded })
inputFile.value = ''
progress = false
}
@ -220,14 +235,8 @@
export async function fileDrop (e: DragEvent): Promise<void> {
progress = true
const list = e.dataTransfer?.files
if (list !== undefined && list.length !== 0) {
for (let index = 0; index < list.length; index++) {
const file = list.item(index)
if (file !== null) {
await createAttachment(file)
}
}
}
if (list === undefined || list.length === 0) return
await uploadFiles(list, { onFileUploaded })
progress = false
}
@ -347,15 +356,20 @@
}
const items = evt.clipboardData?.items ?? []
const files: File[] = []
for (const index in items) {
const item = items[index]
if (item.kind === 'file') {
const blob = item.getAsFile()
if (blob !== null) {
await createAttachment(blob)
files.push(blob)
}
}
}
if (files.length > 0) {
await uploadFiles(files, { onFileUploaded })
}
}
$: dispatch('attachments', {
@ -420,9 +434,7 @@
on:blur
on:focus
on:open-document
attachFile={async (file) => {
return await createAttachment(file)
}}
{attachFile}
/>
{#if attachments.size > 0 && enableAttachments}
<AttachmentsGrid

View File

@ -15,13 +15,13 @@
-->
<script lang="ts">
import { Attachment } from '@hcengineering/attachment'
import { Blob, Class, Data, Doc, DocumentQuery, Ref, Space } from '@hcengineering/core'
import { Class, Data, Doc, DocumentQuery, Ref, Space } from '@hcengineering/core'
import { IntlString } from '@hcengineering/platform'
import { Icon, Label, resizeObserver, Scroller, Spinner, Button, IconAdd } from '@hcengineering/ui'
import view, { BuildModelKey } from '@hcengineering/view'
import { Table } from '@hcengineering/view-resources'
import { getClient } from '@hcengineering/presentation'
import { uploadFiles } from '@hcengineering/uploader'
import { FileUploadCallbackParams, uploadFiles } from '@hcengineering/uploader'
import { createEventDispatcher } from 'svelte'
import attachment from '../plugin'
@ -52,23 +52,31 @@
const client = getClient()
const dispatch = createEventDispatcher()
async function onFileUploaded ({ uuid, name, file }: FileUploadCallbackParams): Promise<void> {
await createAttachment(
client,
uuid,
name,
file,
{ objectClass: object?._class ?? _class, objectId, space },
attachmentClass,
attachmentClassOptions
)
}
async function fileSelected (): Promise<void> {
const list = inputFile.files
if (list === null || list.length === 0) return
loading++
try {
await uploadFiles(list, { objectId, objectClass: object?._class ?? _class }, {}, async (uuid, name, file) => {
await createAttachment(
client,
uuid,
name,
file,
{ objectClass: object?._class ?? _class, objectId, space },
attachmentClass,
attachmentClassOptions
)
})
const options = {
onFileUploaded,
showProgress: {
target: { objectId, objectClass: object?._class ?? _class }
}
}
await uploadFiles(list, options)
} finally {
loading--
}

View File

@ -18,7 +18,7 @@
import { Panel } from '@hcengineering/panel'
import { createQuery, getClient, getFileUrl } from '@hcengineering/presentation'
import { Button, IconMoreH } from '@hcengineering/ui'
import { showFilesUploadPopup } from '@hcengineering/uploader'
import { FileUploadCallbackParams, showFilesUploadPopup } from '@hcengineering/uploader'
import view from '@hcengineering/view'
import { showMenu } from '@hcengineering/view-resources'
@ -68,27 +68,30 @@
function handleUploadFile (): void {
if (object != null) {
void showFilesUploadPopup(
{ objectId: object._id, objectClass: object._class },
{
maxNumberOfFiles: 1,
hideProgress: true
onFileUploaded,
showProgress: {
target: { objectId: object._id, objectClass: object._class }
},
maxNumberOfFiles: 1
},
{},
async (uuid, name, file, path, metadata) => {
const data = {
file: uuid,
title: name,
size: file.size,
type: file.type,
lastModified: file instanceof File ? file.lastModified : Date.now(),
metadata
}
await createFileVersion(client, _id, data)
}
{}
)
}
}
async function onFileUploaded ({ uuid, name, file, metadata }: FileUploadCallbackParams): Promise<void> {
const data = {
file: uuid,
title: name,
size: file.size,
type: file.type,
lastModified: file instanceof File ? file.lastModified : Date.now(),
metadata
}
await createFileVersion(client, _id, data)
}
</script>
{#if object && version}

View File

@ -177,7 +177,14 @@ export async function uploadFilesToDrive (dt: DataTransfer, space: Ref<Drive>, p
? { objectId: parent, objectClass: drive.class.Folder }
: { objectId: space, objectClass: drive.class.Drive }
await uploadFiles(files, target, {}, onFileUploaded)
const options = {
onFileUploaded,
showProgress: {
target
}
}
await uploadFiles(files, options)
}
export async function uploadFilesToDrivePopup (space: Ref<Drive>, parent: Ref<Folder>): Promise<void> {
@ -189,12 +196,15 @@ export async function uploadFilesToDrivePopup (space: Ref<Drive>, parent: Ref<Fo
: { objectId: space, objectClass: drive.class.Drive }
await showFilesUploadPopup(
target,
{},
{
onFileUploaded,
showProgress: {
target
}
},
{
fileManagerSelectionType: 'both'
},
onFileUploaded
}
)
}
@ -234,7 +244,7 @@ async function fileUploadCallback (space: Ref<Drive>, parent: Ref<Folder>): Prom
return current
}
const callback: FileUploadCallback = async (uuid, name, file, path, metadata) => {
const callback: FileUploadCallback = async ({ uuid, name, file, path, metadata }) => {
const folder = await findParent(path)
try {
const data = {

View File

@ -0,0 +1,28 @@
//
// Copyright © 2024 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import type { IndexedObject } from '@uppy/core'
// For Uppy 4.0 compatibility
export type Meta = IndexedObject<any>
export type Body = IndexedObject<any>
/** @public */
export type UppyMeta = Meta & {
uuid: string
relativePath?: string
}
/** @public */
export type UppyBody = Body

View File

@ -14,12 +14,18 @@
//
import { type Blob, type Ref, generateId } from '@hcengineering/core'
import { getMetadata } from '@hcengineering/platform'
import presentation, { generateFileId, getFileMetadata, getUploadUrl } from '@hcengineering/presentation'
import { getMetadata, PlatformError, unknownError } from '@hcengineering/platform'
import presentation, {
type FileUploadParams,
generateFileId,
getFileMetadata,
getFileUploadParams,
getUploadUrl
} from '@hcengineering/presentation'
import { getCurrentLanguage } from '@hcengineering/theme'
import type { FileUploadCallback, FileUploadOptions } from '@hcengineering/uploader'
import type { FileUploadOptions } from '@hcengineering/uploader'
import Uppy, { type IndexedObject, type UppyOptions } from '@uppy/core'
import Uppy, { type UppyFile, type UppyOptions } from '@uppy/core'
import XHR from '@uppy/xhr-upload'
import En from '@uppy/locales/lib/en_US'
@ -29,6 +35,8 @@ import Pt from '@uppy/locales/lib/pt_PT'
import Ru from '@uppy/locales/lib/ru_RU'
import Zh from '@uppy/locales/lib/zh_CN'
import type { UppyBody, UppyMeta } from './types'
type Locale = UppyOptions['locale']
const locales: Record<string, Locale> = {
@ -44,22 +52,65 @@ function getUppyLocale (lang: string): Locale {
return locales[lang] ?? En
}
// For Uppy 4.0 compatibility
type Meta = IndexedObject<any>
type Body = IndexedObject<any>
interface XHRFileProcessor {
name: string
onBeforeUpload: (uppy: Uppy, file: UppyFile, params: FileUploadParams) => Promise<void>
onAfterUpload: (uppy: Uppy, file: UppyFile, params: FileUploadParams) => Promise<void>
}
/** @public */
export type UppyMeta = Meta & {
relativePath?: string
const FormDataFileProcessor: XHRFileProcessor = {
name: 'form-data',
onBeforeUpload: async (uppy: Uppy, file: UppyFile, { url, headers }: FileUploadParams): Promise<void> => {
const xhrUpload = 'xhrUpload' in file && typeof file.xhrUpload === 'object' ? file.xhrUpload : {}
const state = {
xhrUpload: {
...xhrUpload,
endpoint: url,
method: 'POST',
formData: true,
headers
}
}
uppy.setFileState(file.id, state)
},
onAfterUpload: async (uppy: Uppy, file: UppyFile, params: FileUploadParams): Promise<void> => {}
}
const SignedURLFileProcessor: XHRFileProcessor = {
name: 'signed-url',
onBeforeUpload: async (uppy: Uppy, file: UppyFile, { url, headers }: FileUploadParams): Promise<void> => {
const xhrUpload = 'xhrUpload' in file && typeof file.xhrUpload === 'object' ? file.xhrUpload : {}
const signedUrl = await getSignedUploadUrl(file, url)
const state = {
xhrUpload: {
...xhrUpload,
formData: false,
method: 'PUT',
endpoint: signedUrl,
headers: {
...headers,
// S3 direct upload does not require authorization
Authorization: '',
'Content-Type': file.type
}
}
}
uppy.setFileState(file.id, state)
},
onAfterUpload: async (uppy: Uppy, file: UppyFile, params: FileUploadParams): Promise<void> => {
const error = 'error' in file && file.error != null
await fetch(params.url, { method: error ? 'DELETE' : 'PUT' })
}
}
/** @public */
export type UppyBody = Body & {
uuid: string
}
export function getUppy (options: FileUploadOptions): Uppy<UppyMeta, UppyBody> {
const { onFileUploaded } = options
/** @public */
export function getUppy (options: FileUploadOptions, onFileUploaded?: FileUploadCallback): Uppy<UppyMeta, UppyBody> {
const uppyOptions: Partial<UppyOptions> = {
id: generateId(),
locale: getUppyLocale(getCurrentLanguage()),
@ -71,31 +122,14 @@ export function getUppy (options: FileUploadOptions, onFileUploaded?: FileUpload
}
}
const uppy = new Uppy<UppyMeta, UppyBody>(uppyOptions).use(XHR, {
endpoint: getUploadUrl(),
method: 'POST',
headers: {
Authorization: 'Bearer ' + (getMetadata(presentation.metadata.Token) as string)
},
getResponseError: (_, response) => {
return new Error((response as Response).statusText)
}
})
// Hack to setup shouldRetry callback on xhrUpload that is not exposed in options
const xhrUpload = uppy.getState().xhrUpload ?? {}
uppy.getState().xhrUpload = {
...xhrUpload,
shouldRetry: (response: Response) => response.status !== 413
}
const uppy = new Uppy<UppyMeta, UppyBody>(uppyOptions)
// Ensure we always have UUID
uppy.addPreProcessor(async (fileIds: string[]) => {
for (const fileId of fileIds) {
const file = uppy.getFile(fileId)
if (file != null) {
const uuid = generateFileId()
file.meta.uuid = uuid
file.meta.name = uuid
if (file != null && file.meta.uuid === undefined) {
uppy.setFileMeta(fileId, { uuid: generateFileId() })
}
}
})
@ -111,11 +145,78 @@ export function getUppy (options: FileUploadOptions, onFileUploaded?: FileUpload
const uuid = file.meta.uuid as Ref<Blob>
if (uuid !== undefined) {
const metadata = await getFileMetadata(file.data, uuid)
await onFileUploaded(uuid, file.name, file.data, file.meta.relativePath, metadata)
await onFileUploaded({
uuid,
name: file.name,
file: file.data,
path: file.meta.relativePath,
metadata
})
} else {
console.warn('missing file metadata uuid', file)
}
}
})
}
configureXHR(uppy)
return uppy
}
function configureXHR (uppy: Uppy<UppyMeta, UppyBody>): Uppy<UppyMeta, UppyBody> {
uppy.use(XHR, {
endpoint: getUploadUrl(),
method: 'POST',
headers: {
Authorization: 'Bearer ' + (getMetadata(presentation.metadata.Token) as string)
},
getResponseError: (_, response) => {
return new Error((response as Response).statusText)
}
})
// Hack to setup shouldRetry callback on xhrUpload that is not exposed in options
const xhrUpload = uppy.getState().xhrUpload ?? {}
uppy.getState().xhrUpload = {
...xhrUpload,
shouldRetry: (response: Response) => !(response.status in [401, 403, 413])
}
uppy.addPreProcessor(async (fileIds: string[]) => {
for (const fileId of fileIds) {
const file = uppy.getFile(fileId)
if (file != null) {
const params = getFileUploadParams(file.meta.uuid, file.data)
const processor = getXHRProcessor(file, params)
await processor.onBeforeUpload(uppy, file, params)
}
}
})
uppy.addPostProcessor(async (fileIds: string[]) => {
for (const fileId of fileIds) {
const file = uppy.getFile(fileId)
if (file != null) {
const params = getFileUploadParams(file.meta.uuid, file.data)
const processor = getXHRProcessor(file, params)
await processor.onAfterUpload(uppy, file, params)
}
}
})
return uppy
}
function getXHRProcessor (file: UppyFile, params: FileUploadParams): XHRFileProcessor {
return params.method === 'form-data' ? FormDataFileProcessor : SignedURLFileProcessor
}
async function getSignedUploadUrl (file: UppyFile, signUrl: string): Promise<string> {
const response = await fetch(signUrl, { method: 'POST' })
if (!response.ok) {
throw new PlatformError(unknownError('Failed to get signed upload url'))
}
return await response.text()
}

View File

@ -14,54 +14,45 @@
//
import { showPopup } from '@hcengineering/ui'
import {
type FileUploadCallback,
type FileUploadOptions,
type FileUploadPopupOptions,
type FileUploadTarget,
toFileWithPath
} from '@hcengineering/uploader'
import { type FileUploadOptions, type FileUploadPopupOptions, toFileWithPath } from '@hcengineering/uploader'
import FileUploadPopup from './components/FileUploadPopup.svelte'
import { dockFileUpload } from './store'
import { getUppy } from './uppy'
import { generateFileId } from '@hcengineering/presentation'
/** @public */
export async function showFilesUploadPopup (
target: FileUploadTarget,
options: FileUploadOptions,
popupOptions: FileUploadPopupOptions,
onFileUploaded: FileUploadCallback
popupOptions: FileUploadPopupOptions
): Promise<void> {
const uppy = getUppy(options, onFileUploaded)
const uppy = getUppy(options)
showPopup(FileUploadPopup, { uppy, target, options: popupOptions }, undefined, (res) => {
if (res === true && options.hideProgress !== true) {
showPopup(FileUploadPopup, { uppy, options: popupOptions }, undefined, (res) => {
if (res === true && options.showProgress !== undefined) {
const { target } = options.showProgress
dockFileUpload(target, uppy)
}
})
}
/** @public */
export async function uploadFiles (
files: File[] | FileList,
target: FileUploadTarget,
options: FileUploadOptions,
onFileUploaded: FileUploadCallback
): Promise<void> {
export async function uploadFiles (files: File[] | FileList, options: FileUploadOptions): Promise<void> {
const items = Array.from(files, (p) => toFileWithPath(p))
if (items.length === 0) return
const uppy = getUppy(options, onFileUploaded)
const uppy = getUppy(options)
for (const data of items) {
const { name, type, relativePath } = data
uppy.addFile({ name, type, data, meta: { relativePath } })
const uuid = generateFileId()
uppy.addFile({ name, type, data, meta: { name: uuid, uuid, relativePath } })
}
if (options.hideProgress !== true) {
if (options.showProgress !== undefined) {
const { target } = options.showProgress
dockFileUpload(target, uppy)
}

View File

@ -21,20 +21,10 @@ export interface FileWithPath extends File {
}
/** @public */
export type UploadFilesPopupFn = (
target: FileUploadTarget,
options: FileUploadOptions,
popupOptions: FileUploadPopupOptions,
onFileUploaded: FileUploadCallback
) => Promise<void>
export type UploadFilesPopupFn = (options: FileUploadOptions, popupOptions: FileUploadPopupOptions) => Promise<void>
/** @public */
export type UploadFilesFn = (
files: File[] | FileList,
target: FileUploadTarget,
options: FileUploadOptions,
onFileUploaded: FileUploadCallback
) => Promise<void>
export type UploadFilesFn = (files: File[] | FileList, options: FileUploadOptions) => Promise<void>
/** @public */
export interface FileUploadTarget {
@ -42,12 +32,20 @@ export interface FileUploadTarget {
objectClass: Ref<Class<Doc>>
}
/** @public */
export interface FileUploadProgressOptions {
target: FileUploadTarget
}
/** @public */
export interface FileUploadOptions {
// Uppy options
maxFileSize?: number
maxNumberOfFiles?: number
allowedFileTypes?: string[] | null
hideProgress?: boolean
onFileUploaded?: FileUploadCallback
showProgress?: FileUploadProgressOptions
}
/** @public */
@ -56,10 +54,13 @@ export interface FileUploadPopupOptions {
}
/** @public */
export type FileUploadCallback = (
uuid: Ref<PlatformBlob>,
name: string,
file: FileWithPath | Blob,
path: string | undefined,
export interface FileUploadCallbackParams {
uuid: Ref<PlatformBlob>
name: string
file: FileWithPath | Blob
path: string | undefined
metadata: Record<string, any> | undefined
) => Promise<void>
}
/** @public */
export type FileUploadCallback = (params: FileUploadCallbackParams) => Promise<void>

View File

@ -16,34 +16,27 @@
import { getResource } from '@hcengineering/platform'
import uploader from './plugin'
import type {
FileUploadCallback,
FileUploadOptions,
FileUploadPopupOptions,
FileUploadTarget,
FileWithPath
} from './types'
import type { FileUploadOptions, FileUploadPopupOptions, FileWithPath } from './types'
/** @public */
export async function showFilesUploadPopup (
target: FileUploadTarget,
options: FileUploadOptions,
popupOptions: FileUploadPopupOptions,
onFileUploaded: FileUploadCallback
popupOptions: FileUploadPopupOptions
): Promise<void> {
const fn = await getResource(uploader.function.ShowFilesUploadPopup)
await fn(target, options, popupOptions, onFileUploaded)
await fn(options, popupOptions)
}
/** @public */
export async function uploadFiles (
files: File[] | FileList,
target: FileUploadTarget,
options: FileUploadOptions,
onFileUploaded: FileUploadCallback
): Promise<void> {
export async function uploadFile (file: File, options: FileUploadOptions): Promise<void> {
const fn = await getResource(uploader.function.UploadFiles)
await fn(files, target, options, onFileUploaded)
await fn([file], options)
}
/** @public */
export async function uploadFiles (files: File[] | FileList, options: FileUploadOptions): Promise<void> {
const fn = await getResource(uploader.function.UploadFiles)
await fn(files, options)
}
/** @public */

View File

@ -2115,6 +2115,11 @@
"packageName": "@hcengineering/cloud-branding",
"projectFolder": "workers/branding",
"shouldPublish": false
},
{
"packageName": "@hcengineering/cloud-datalake",
"projectFolder": "workers/datalake",
"shouldPublish": false
}
]
}

View File

@ -15,7 +15,7 @@
import { type MeasureContext, type WorkspaceId, concatLink } from '@hcengineering/core'
import FormData from 'form-data'
import fetch from 'node-fetch'
import fetch, { type RequestInit, type Response } from 'node-fetch'
import { Readable } from 'stream'
/** @public */
@ -34,11 +34,6 @@ export interface StatObjectOutput {
size?: number
}
/** @public */
export interface PutObjectOutput {
id: string
}
interface BlobUploadError {
key: string
error: string
@ -54,7 +49,11 @@ type BlobUploadResult = BlobUploadSuccess | BlobUploadError
/** @public */
export class Client {
constructor (private readonly endpoint: string) {}
private readonly endpoint: string
constructor (host: string, port?: number) {
this.endpoint = port !== undefined ? `${host}:${port}` : host
}
getObjectUrl (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): string {
const path = `/blob/${workspace.name}/${encodeURIComponent(objectName)}`
@ -63,21 +62,7 @@ export class Client {
async getObject (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise<Readable> {
const url = this.getObjectUrl(ctx, workspace, objectName)
let response
try {
response = await fetch(url)
} catch (err: any) {
ctx.error('network error', { error: err })
throw new Error(`Network error ${err}`)
}
if (!response.ok) {
if (response.status === 404) {
throw new Error('Not Found')
}
throw new Error('HTTP error ' + response.status)
}
const response = await fetchSafe(ctx, url)
if (response.body == null) {
ctx.error('bad datalake response', { objectName })
@ -99,20 +84,7 @@ export class Client {
Range: `bytes=${offset}-${length ?? ''}`
}
let response
try {
response = await fetch(url, { headers })
} catch (err: any) {
ctx.error('network error', { error: err })
throw new Error(`Network error ${err}`)
}
if (!response.ok) {
if (response.status === 404) {
throw new Error('Not Found')
}
throw new Error('HTTP error ' + response.status)
}
const response = await fetchSafe(ctx, url, { headers })
if (response.body == null) {
ctx.error('bad datalake response', { objectName })
@ -129,20 +101,7 @@ export class Client {
): Promise<StatObjectOutput | undefined> {
const url = this.getObjectUrl(ctx, workspace, objectName)
let response
try {
response = await fetch(url, { method: 'HEAD' })
} catch (err: any) {
ctx.error('network error', { error: err })
throw new Error(`Network error ${err}`)
}
if (!response.ok) {
if (response.status === 404) {
return undefined
}
throw new Error('HTTP error ' + response.status)
}
const response = await fetchSafe(ctx, url, { method: 'HEAD' })
const headers = response.headers
const lastModified = Date.parse(headers.get('Last-Modified') ?? '')
@ -158,30 +117,35 @@ export class Client {
async deleteObject (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise<void> {
const url = this.getObjectUrl(ctx, workspace, objectName)
let response
try {
response = await fetch(url, { method: 'DELETE' })
} catch (err: any) {
ctx.error('network error', { error: err })
throw new Error(`Network error ${err}`)
}
if (!response.ok) {
if (response.status === 404) {
throw new Error('Not Found')
}
throw new Error('HTTP error ' + response.status)
}
await fetchSafe(ctx, url, { method: 'DELETE' })
}
async putObject (
ctx: MeasureContext,
workspace: WorkspaceId,
objectName: string,
stream: Readable | Buffer | string,
metadata: ObjectMetadata,
size?: number
): Promise<void> {
if (size === undefined || size < 64 * 1024 * 1024) {
await ctx.with('direct-upload', {}, async (ctx) => {
await this.uploadWithFormData(ctx, workspace, objectName, stream, metadata)
})
} else {
await ctx.with('signed-url-upload', {}, async (ctx) => {
await this.uploadWithSignedURL(ctx, workspace, objectName, stream, metadata)
})
}
}
private async uploadWithFormData (
ctx: MeasureContext,
workspace: WorkspaceId,
objectName: string,
stream: Readable | Buffer | string,
metadata: ObjectMetadata
): Promise<PutObjectOutput> {
): Promise<void> {
const path = `/upload/form-data/${workspace.name}`
const url = concatLink(this.endpoint, path)
@ -196,17 +160,7 @@ export class Client {
}
form.append('file', stream, options)
let response
try {
response = await fetch(url, { method: 'POST', body: form })
} catch (err: any) {
ctx.error('network error', { error: err })
throw new Error(`Network error ${err}`)
}
if (!response.ok) {
throw new Error('HTTP error ' + response.status)
}
const response = await fetchSafe(ctx, url, { method: 'POST', body: form })
const result = (await response.json()) as BlobUploadResult[]
if (result.length !== 1) {
@ -219,8 +173,68 @@ export class Client {
if ('error' in uploadResult) {
ctx.error('error during blob upload', { objectName, error: uploadResult.error })
throw new Error('Upload failed: ' + uploadResult.error)
} else {
return { id: uploadResult.id }
}
}
private async uploadWithSignedURL (
ctx: MeasureContext,
workspace: WorkspaceId,
objectName: string,
stream: Readable | Buffer | string,
metadata: ObjectMetadata
): Promise<void> {
const url = await this.signObjectSign(ctx, workspace, objectName)
try {
await fetchSafe(ctx, url, {
body: stream,
method: 'PUT',
headers: {
'Content-Type': metadata.type,
'Content-Length': metadata.size?.toString() ?? '0',
'x-amz-meta-last-modified': metadata.lastModified.toString()
}
})
await this.signObjectComplete(ctx, workspace, objectName)
} catch {
await this.signObjectDelete(ctx, workspace, objectName)
}
}
private async signObjectSign (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise<string> {
const url = this.getSignObjectUrl(workspace, objectName)
const response = await fetchSafe(ctx, url, { method: 'POST' })
return await response.text()
}
private async signObjectComplete (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise<void> {
const url = this.getSignObjectUrl(workspace, objectName)
await fetchSafe(ctx, url, { method: 'PUT' })
}
private async signObjectDelete (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise<void> {
const url = this.getSignObjectUrl(workspace, objectName)
await fetchSafe(ctx, url, { method: 'DELETE' })
}
private getSignObjectUrl (workspace: WorkspaceId, objectName: string): string {
const path = `/upload/signed-url/${workspace.name}/${encodeURIComponent(objectName)}`
return concatLink(this.endpoint, path)
}
}
async function fetchSafe (ctx: MeasureContext, url: string, init?: RequestInit): Promise<Response> {
let response
try {
response = await fetch(url, init)
} catch (err: any) {
ctx.error('network error', { error: err })
throw new Error(`Network error ${err}`)
}
if (!response.ok) {
throw new Error(response.status === 404 ? 'Not Found' : 'HTTP error ' + response.status)
}
return response
}

View File

@ -37,7 +37,7 @@ export class DatalakeService implements StorageAdapter {
static config = 'datalake'
client: Client
constructor (readonly opt: DatalakeConfig) {
this.client = new Client(opt.endpoint)
this.client = new Client(opt.endpoint, opt.port)
}
async initialize (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<void> {}
@ -129,7 +129,7 @@ export class DatalakeService implements StorageAdapter {
await ctx.with('put', {}, async (ctx) => {
await withRetry(ctx, 5, async () => {
return await this.client.putObject(ctx, workspaceId, objectName, stream, metadata)
await this.client.putObject(ctx, workspaceId, objectName, stream, metadata, size)
})
})

View File

@ -17,6 +17,7 @@ Front service is suited to deliver application bundles and resource assets, it a
* MODEL_VERSION: Specifies the required model version.
* SERVER_SECRET: Specifies the server secret.
* PREVIEW_CONFIG: Specifies the preview configuration.
* UPLOAD_CONFIG: Specifies the upload configuration.
* BRANDING_URL: Specifies the URL of the branding service.
## Preview service configuration

View File

@ -256,6 +256,7 @@ export function start (
collaboratorUrl: string
brandingUrl?: string
previewConfig: string
uploadConfig: string
pushPublicKey?: string
disableSignUp?: string
},
@ -308,6 +309,7 @@ export function start (
COLLABORATOR_URL: config.collaboratorUrl,
BRANDING_URL: config.brandingUrl,
PREVIEW_CONFIG: config.previewConfig,
UPLOAD_CONFIG: config.uploadConfig,
PUSH_PUBLIC_KEY: config.pushPublicKey,
DISABLE_SIGNUP: config.disableSignUp,
...(extraConfig ?? {})
@ -501,8 +503,15 @@ export function start (
void filesHandler(req, res)
})
// eslint-disable-next-line @typescript-eslint/no-misused-promises
app.post('/files', async (req, res) => {
app.post('/files', (req, res) => {
void handleUpload(req, res)
})
app.post('/files/*', (req, res) => {
void handleUpload(req, res)
})
const handleUpload = async (req: Request, res: Response): Promise<void> => {
await ctx.with(
'post-file',
{},
@ -538,7 +547,7 @@ export function start (
},
{ url: req.path, query: req.query }
)
})
}
const handleDelete = async (req: Request, res: Response): Promise<void> => {
try {

View File

@ -101,6 +101,11 @@ export function startFront (ctx: MeasureContext, extraConfig?: Record<string, st
process.exit(1)
}
let uploadConfig = process.env.UPLOAD_CONFIG
if (uploadConfig === undefined) {
uploadConfig = ''
}
let previewConfig = process.env.PREVIEW_CONFIG
if (previewConfig === undefined) {
// Use universal preview config
@ -136,6 +141,7 @@ export function startFront (ctx: MeasureContext, extraConfig?: Record<string, st
collaborator,
brandingUrl,
previewConfig,
uploadConfig,
pushPublicKey,
disableSignUp
}

View File

@ -2,10 +2,11 @@
"extends": "./node_modules/@hcengineering/platform-rig/profiles/default/tsconfig.json",
"compilerOptions": {
"rootDir": "./src",
"outDir": "./lib",
"declarationDir": "./types",
"tsBuildInfoFile": ".build/build.tsbuildinfo",
"types": ["@cloudflare/workers-types", "jest"]
"rootDir": "./src",
"outDir": "./lib",
"declarationDir": "./types",
"tsBuildInfoFile": ".build/build.tsbuildinfo",
"types": ["@cloudflare/workers-types", "jest"],
"lib": ["esnext"]
}
}

1
workers/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
**/.dev.vars

View File

@ -37,7 +37,7 @@
"@types/jest": "^29.5.5"
},
"dependencies": {
"itty-router": "^5.0.17"
"itty-router": "^5.0.18"
},
"private": true
}

View File

@ -2,10 +2,11 @@
"extends": "./node_modules/@hcengineering/platform-rig/profiles/default/tsconfig.json",
"compilerOptions": {
"rootDir": "./src",
"outDir": "./lib",
"declarationDir": "./types",
"tsBuildInfoFile": ".build/build.tsbuildinfo",
"types": ["@cloudflare/workers-types", "jest"]
"rootDir": "./src",
"outDir": "./lib",
"declarationDir": "./types",
"tsBuildInfoFile": ".build/build.tsbuildinfo",
"types": ["@cloudflare/workers-types", "jest"],
"lib": ["esnext"]
}
}

View File

@ -0,0 +1,7 @@
module.exports = {
extends: ['./node_modules/@hcengineering/platform-rig/profiles/node/eslint.config.json'],
parserOptions: {
tsconfigRootDir: __dirname,
project: './tsconfig.json'
}
}

View File

@ -0,0 +1,5 @@
{
"$schema": "https://developer.microsoft.com/json-schemas/rig-package/rig.schema.json",
"rigPackageName": "@hcengineering/platform-rig",
"rigProfile": "node"
}

View File

@ -0,0 +1,7 @@
module.exports = {
preset: 'ts-jest',
testEnvironment: 'node',
testMatch: ['**/?(*.)+(spec|test).[jt]s?(x)'],
roots: ["./src"],
coverageReporters: ["text-summary", "html"]
}

View File

@ -0,0 +1,43 @@
{
"name": "@hcengineering/cloud-datalake",
"version": "0.6.0",
"main": "lib/index.js",
"types": "types/index.d.ts",
"template": "@hcengineering/cloud-package",
"scripts": {
"deploy": "wrangler deploy",
"dev": "wrangler dev --port 4021",
"start": "wrangler dev --port 4021",
"cf-typegen": "wrangler types",
"build": "compile",
"build:watch": "compile",
"test": "jest --passWithNoTests --silent --forceExit",
"format": "format src",
"_phase:build": "compile transpile src",
"_phase:test": "jest --passWithNoTests --silent --forceExit",
"_phase:format": "format src",
"_phase:validate": "compile validate"
},
"devDependencies": {
"@hcengineering/platform-rig": "^0.6.0",
"@cloudflare/workers-types": "^4.20240729.0",
"typescript": "^5.3.3",
"wrangler": "^3.80.1",
"jest": "^29.7.0",
"prettier": "^3.1.0",
"ts-jest": "^29.1.1",
"@typescript-eslint/eslint-plugin": "^6.11.0",
"@typescript-eslint/parser": "^6.11.0",
"eslint-config-standard-with-typescript": "^40.0.0",
"eslint-plugin-import": "^2.26.0",
"eslint-plugin-n": "^15.4.0",
"eslint-plugin-promise": "^6.1.1",
"eslint": "^8.54.0",
"@types/jest": "^29.5.5"
},
"dependencies": {
"aws4fetch": "^1.0.20",
"itty-router": "^5.0.18",
"postgres": "^3.4.4"
}
}

View File

@ -0,0 +1,15 @@
FROM cockroachdb/cockroach:latest
ADD init.sh /cockroach/
RUN chmod a+x /cockroach/init.sh
ADD logs.yaml /cockroach/
ADD optimizations.sql /cockroach/
ADD datalake.sql /cockroach/
WORKDIR /cockroach/
EXPOSE 8080
EXPOSE 26257
ENTRYPOINT ["/cockroach/init.sh"]

View File

@ -0,0 +1,5 @@
# Running Cockroach DB in Docker
```bash
docker run -d -p 8080:8080 -p 26257:26257 cockroach:dev -name cockroach
```

View File

@ -0,0 +1,34 @@
CREATE SCHEMA IF NOT EXISTS blob;
DROP TABLE IF EXISTS blob.blob;
DROP TABLE IF EXISTS blob.data;
DROP TYPE IF EXISTS blob.content_type;
DROP TYPE IF EXISTS blob.location;
-- B L O B
CREATE TYPE blob.content_type AS ENUM ('application','audio','font','image','model','text','video');
CREATE TYPE blob.location AS ENUM ('kv', 'weur', 'eeur', 'wnam', 'enam', 'apac');
\echo "Creating blob.data..."
CREATE TABLE blob.data (
hash UUID NOT NULL,
location blob.location NOT NULL,
size INT8 NOT NULL,
filename UUID NOT NULL,
type blob.content_type NOT NULL,
subtype STRING(64) NOT NULL,
CONSTRAINT pk_data PRIMARY KEY (hash, location)
);
\echo "Creating blob.blob..."
CREATE TABLE blob.blob (
workspace STRING(255) NOT NULL,
name STRING(255) NOT NULL,
hash UUID NOT NULL,
location blob.location NOT NULL,
deleted BOOL NOT NULL,
CONSTRAINT pk_blob PRIMARY KEY (workspace, name),
CONSTRAINT fk_data FOREIGN KEY (hash, location) REFERENCES blob.data (hash, location)
);

View File

@ -0,0 +1,9 @@
#!/usr/bin/env bash
./cockroach start-single-node --insecure --log-config-file=logs.yaml --cache=.25 --background --store=type=mem,size=50%
./cockroach sql --insecure --file optimizations.sql
./cockroach sql --insecure --execute="CREATE DATABASE datalake;"
./cockroach sql --insecure --database=datalake --file datalake.sql
cd /cockroach/cockroach-data/logs
tail -f cockroach.log

View File

@ -0,0 +1,67 @@
file-defaults:
max-file-size: 10MiB
max-group-size: 100MiB
file-permissions: 644
buffered-writes: true
filter: WARNING
format: crdb-v2
redact: false
redactable: true
exit-on-error: true
auditable: false
dir: cockroach-data/logs
fluent-defaults:
filter: WARNING
format: json-fluent-compact
redact: false
redactable: true
exit-on-error: false
auditable: false
http-defaults:
method: POST
unsafe-tls: false
timeout: 0s
disable-keep-alives: false
filter: WARNING
format: json-compact
redact: false
redactable: true
exit-on-error: false
auditable: false
sinks:
file-groups:
default:
channels:
WARNING: all
health:
channels: [HEALTH]
pebble:
channels: [STORAGE]
security:
channels: [PRIVILEGES, USER_ADMIN]
auditable: true
sql-audit:
channels: [SENSITIVE_ACCESS]
auditable: true
sql-auth:
channels: [SESSIONS]
auditable: true
sql-exec:
channels: [SQL_EXEC]
sql-slow:
channels: [SQL_PERF]
sql-slow-internal-only:
channels: [SQL_INTERNAL_PERF]
telemetry:
channels: [TELEMETRY]
max-file-size: 100KiB
max-group-size: 1.0MiB
stderr:
channels: all
filter: NONE
redact: false
redactable: true
exit-on-error: true
capture-stray-errors:
enable: true
max-group-size: 100MiB

View File

@ -0,0 +1,11 @@
-- see https://www.cockroachlabs.com/docs/v21.2/local-testing.html#use-a-local-single-node-cluster-with-in-memory-storage
SET CLUSTER SETTING kv.raft_log.disable_synchronization_unsafe = true;
SET CLUSTER SETTING kv.range_merge.queue_interval = '50ms';
SET CLUSTER SETTING jobs.registry.interval.gc = '30s';
SET CLUSTER SETTING jobs.registry.interval.cancel = '180s';
SET CLUSTER SETTING jobs.retention_time = '15s';
--SET CLUSTER SETTING schemachanger.backfiller.buffer_increment = '128 KiB';
SET CLUSTER SETTING sql.stats.automatic_collection.enabled = false;
SET CLUSTER SETTING kv.range_split.by_load_merge_delay = '5s';
ALTER RANGE default CONFIGURE ZONE USING "gc.ttlseconds" = 600;
ALTER DATABASE system CONFIGURE ZONE USING "gc.ttlseconds" = 600;

View File

@ -0,0 +1,285 @@
//
// Copyright © 2024 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { error, json } from 'itty-router'
import postgres from 'postgres'
import * as db from './db'
import { toUUID } from './encodings'
import { selectStorage } from './storage'
import { type UUID } from './types'
import { copyVideo, deleteVideo } from './video'
const expires = 86400
const cacheControl = `public,max-age=${expires}`
// 64MB hash limit
const HASH_LIMIT = 64 * 1024 * 1024
interface BlobMetadata {
lastModified: number
type: string
size: number
name: string
}
export function getBlobURL (request: Request, workspace: string, name: string): string {
const path = `/blob/${workspace}/${name}`
return new URL(path, request.url).toString()
}
export async function handleBlobGet (
request: Request,
env: Env,
ctx: ExecutionContext,
workspace: string,
name: string
): Promise<Response> {
const sql = postgres(env.HYPERDRIVE.connectionString)
const { bucket } = selectStorage(env, workspace)
const blob = await db.getBlob(sql, { workspace, name })
if (blob === null || blob.deleted) {
return error(404)
}
const cache = caches.default
const cached = await cache.match(request)
if (cached !== undefined) {
return cached
}
const range = request.headers.has('Range') ? request.headers : undefined
const object = await bucket.get(blob.filename, { range })
if (object === null) {
return error(404)
}
const headers = r2MetadataHeaders(object)
if (range !== undefined && object?.range !== undefined) {
headers.set('Content-Range', rangeHeader(object.range, object.size))
}
const length = object?.range !== undefined && 'length' in object.range ? object?.range?.length : undefined
const status = length !== undefined && length < object.size ? 206 : 200
const response = new Response(object?.body, { headers, status })
ctx.waitUntil(cache.put(request, response.clone()))
return response
}
export async function handleBlobHead (
request: Request,
env: Env,
ctx: ExecutionContext,
workspace: string,
name: string
): Promise<Response> {
const sql = postgres(env.HYPERDRIVE.connectionString)
const { bucket } = selectStorage(env, workspace)
const blob = await db.getBlob(sql, { workspace, name })
if (blob === null) {
return error(404)
}
const head = await bucket.head(blob.filename)
if (head?.httpMetadata === undefined) {
return error(404)
}
const headers = r2MetadataHeaders(head)
return new Response(null, { headers, status: 200 })
}
export async function deleteBlob (env: Env, workspace: string, name: string): Promise<Response> {
const sql = postgres(env.HYPERDRIVE.connectionString)
try {
await Promise.all([db.deleteBlob(sql, { workspace, name }), deleteVideo(env, workspace, name)])
return new Response(null, { status: 204 })
} catch (err: any) {
const message = err instanceof Error ? err.message : String(err)
console.error({ error: 'failed to delete blob:' + message })
return error(500)
}
}
export async function postBlobFormData (request: Request, env: Env, workspace: string): Promise<Response> {
const sql = postgres(env.HYPERDRIVE.connectionString)
const formData = await request.formData()
const files: [File, key: string][] = []
formData.forEach((value: any, key: string) => {
if (typeof value === 'object') files.push([value, key])
})
const result = await Promise.all(
files.map(async ([file, key]) => {
const { name, type, lastModified } = file
try {
const metadata = await saveBlob(env, sql, file, type, workspace, name, lastModified)
// TODO this probably should happen via queue, let it be here for now
if (type.startsWith('video/')) {
const blobURL = getBlobURL(request, workspace, name)
await copyVideo(env, blobURL, workspace, name)
}
return { key, metadata }
} catch (err: any) {
const error = err instanceof Error ? err.message : String(err)
console.error('failed to upload blob:', error)
return { key, error }
}
})
)
return json(result)
}
async function saveBlob (
env: Env,
sql: postgres.Sql,
file: File,
type: string,
workspace: string,
name: string,
lastModified: number
): Promise<BlobMetadata> {
const { location, bucket } = selectStorage(env, workspace)
const size = file.size
const [mimetype, subtype] = type.split('/')
const httpMetadata = { contentType: type, cacheControl }
const filename = getUniqueFilename()
const sha256hash = await getSha256(file)
if (sha256hash !== null) {
// Lucky boy, nothing to upload, use existing blob
const hash = sha256hash
const data = await db.getData(sql, { hash, location })
if (data !== null) {
await db.createBlob(sql, { workspace, name, hash, location })
} else {
await bucket.put(filename, file, { httpMetadata })
await sql.begin((sql) => [
db.createData(sql, { hash, location, filename, type: mimetype, subtype, size }),
db.createBlob(sql, { workspace, name, hash, location })
])
}
return { type, size, lastModified, name }
} else {
// For large files we cannot calculate checksum beforehead
// upload file with unique filename and then obtain checksum
const object = await bucket.put(filename, file, { httpMetadata })
const hash =
object.checksums.md5 !== undefined ? getMd5Checksum(object.checksums.md5) : (crypto.randomUUID() as UUID)
const data = await db.getData(sql, { hash, location })
if (data !== null) {
// We found an existing blob with the same hash
// we can safely remove the existing blob from storage
await Promise.all([bucket.delete(filename), db.createBlob(sql, { workspace, name, hash, location })])
} else {
// Otherwise register a new hash and blob
await sql.begin((sql) => [
db.createData(sql, { hash, location, filename, type: mimetype, subtype, size }),
db.createBlob(sql, { workspace, name, hash, location })
])
}
return { type, size, lastModified, name }
}
}
export async function handleBlobUploaded (env: Env, workspace: string, name: string, filename: UUID): Promise<void> {
const sql = postgres(env.HYPERDRIVE.connectionString)
const { location, bucket } = selectStorage(env, workspace)
const object = await bucket.head(filename)
if (object?.httpMetadata === undefined) {
throw Error('blob not found')
}
const hash = object.checksums.md5 !== undefined ? getMd5Checksum(object.checksums.md5) : (crypto.randomUUID() as UUID)
const data = await db.getData(sql, { hash, location })
if (data !== null) {
await Promise.all([bucket.delete(filename), db.createBlob(sql, { workspace, name, hash, location })])
} else {
const size = object.size
const type = object.httpMetadata.contentType ?? 'application/octet-stream'
const [mimetype, subtype] = type.split('/')
await db.createData(sql, { hash, location, filename, type: mimetype, subtype, size })
await db.createBlob(sql, { workspace, name, hash, location })
}
}
function getUniqueFilename (): UUID {
return crypto.randomUUID() as UUID
}
async function getSha256 (file: File): Promise<UUID | null> {
if (file.size > HASH_LIMIT) {
return null
}
const digestStream = new crypto.DigestStream('SHA-256')
await file.stream().pipeTo(digestStream)
const digest = await digestStream.digest
return toUUID(new Uint8Array(digest))
}
function getMd5Checksum (digest: ArrayBuffer): UUID {
return toUUID(new Uint8Array(digest))
}
function rangeHeader (range: R2Range, size: number): string {
const offset = 'offset' in range ? range.offset : undefined
const length = 'length' in range ? range.length : undefined
const suffix = 'suffix' in range ? range.suffix : undefined
const start = suffix !== undefined ? size - suffix : offset ?? 0
const end = suffix !== undefined ? size : length !== undefined ? start + length : size
return `bytes ${start}-${end - 1}/${size}`
}
function r2MetadataHeaders (head: R2Object): Headers {
return head.httpMetadata !== undefined
? new Headers({
'Accept-Ranges': 'bytes',
'Content-Length': head.size.toString(),
'Content-Type': head.httpMetadata.contentType ?? '',
'Cache-Control': head.httpMetadata.cacheControl ?? cacheControl,
'Last-Modified': head.uploaded.toUTCString(),
ETag: head.httpEtag
})
: new Headers({
'Accept-Ranges': 'bytes',
'Content-Length': head.size.toString(),
'Cache-Control': cacheControl,
'Last-Modified': head.uploaded.toUTCString(),
ETag: head.httpEtag
})
}

View File

@ -0,0 +1,103 @@
//
// Copyright © 2024 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { type IRequest } from 'itty-router'
// This is a copy of cors.ts from itty-router with following issues fixed:
// - https://github.com/kwhitley/itty-router/issues/242
// - https://github.com/kwhitley/itty-router/issues/249
export interface CorsOptions {
credentials?: true
origin?: boolean | string | string[] | RegExp | ((origin: string) => string | undefined)
maxAge?: number
allowMethods?: string | string[]
allowHeaders?: any
exposeHeaders?: string | string[]
}
export type Preflight = (request: IRequest) => Response | undefined
export type Corsify = (response: Response, request?: IRequest) => Response | undefined
export interface CorsPair {
preflight: Preflight
corsify: Corsify
}
// Create CORS function with default options.
export const cors = (options: CorsOptions = {}): CorsPair => {
// Destructure and set defaults for options.
const { origin = '*', credentials = false, allowMethods = '*', allowHeaders, exposeHeaders, maxAge } = options
const getAccessControlOrigin = (request?: Request): string | null | undefined => {
const requestOrigin = request?.headers.get('origin') // may be null if no request passed
if (requestOrigin === undefined || requestOrigin === null) return requestOrigin
if (origin === true) return requestOrigin
if (origin instanceof RegExp) return origin.test(requestOrigin) ? requestOrigin : undefined
if (Array.isArray(origin)) return origin.includes(requestOrigin) ? requestOrigin : undefined
if (origin instanceof Function) return origin(requestOrigin) ?? undefined
return origin === '*' && credentials ? requestOrigin : (origin as string)
}
const appendHeadersAndReturn = (response: Response, headers: Record<string, any>): Response => {
for (const [key, value] of Object.entries(headers)) {
if (value !== undefined && value !== null && value !== '') {
response.headers.append(key, value)
}
}
return response
}
const preflight = (request: Request): Response | undefined => {
if (request.method === 'OPTIONS') {
const response = new Response(null, { status: 204 })
const allowMethodsHeader = Array.isArray(allowMethods) ? allowMethods.join(',') : allowMethods
const allowHeadersHeader = Array.isArray(allowHeaders) ? allowHeaders.join(',') : allowHeaders
const exposeHeadersHeader = Array.isArray(exposeHeaders) ? exposeHeaders.join(',') : exposeHeaders
return appendHeadersAndReturn(response, {
'access-control-allow-origin': getAccessControlOrigin(request),
'access-control-allow-methods': allowMethodsHeader,
'access-control-expose-headers': exposeHeadersHeader,
'access-control-allow-headers': allowHeadersHeader ?? request.headers.get('access-control-request-headers'),
'access-control-max-age': maxAge,
'access-control-allow-credentials': credentials
})
} // otherwise ignore
}
const corsify = (response: Response, request?: Request): Response | undefined => {
// ignore if already has CORS headers
if (response?.headers?.has('access-control-allow-origin') || response.status === 101) {
return response
}
const responseCopy = new Response(response.body, {
status: response.status,
statusText: response.statusText,
headers: response.headers
})
return appendHeadersAndReturn(responseCopy, {
'access-control-allow-origin': getAccessControlOrigin(request),
'access-control-allow-credentials': credentials
})
}
// Return corsify and preflight methods.
return { corsify, preflight }
}

105
workers/datalake/src/db.ts Normal file
View File

@ -0,0 +1,105 @@
//
// Copyright © 2024 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import type postgres from 'postgres'
import { type Location, type UUID } from './types'
export interface BlobDataId {
hash: UUID
location: Location
}
export interface BlobDataRecord extends BlobDataId {
filename: UUID
size: number
type: string
subtype: string
}
export interface BlobId {
workspace: string
name: string
}
export interface BlobRecord extends BlobId {
hash: UUID
location: Location
deleted: boolean
}
export interface BlobRecordWithFilename extends BlobRecord {
filename: string
}
export async function getData (sql: postgres.Sql, dataId: BlobDataId): Promise<BlobDataRecord | null> {
const { hash, location } = dataId
const rows = await sql<BlobDataRecord[]>`
SELECT hash, location, filename, size, type, subtype
FROM blob.data
WHERE hash = ${hash} AND location = ${location}
`
if (rows.length > 0) {
return rows[0]
}
return null
}
export async function createData (sql: postgres.Sql, data: BlobDataRecord): Promise<void> {
const { hash, location, filename, size, type, subtype } = data
await sql`
UPSERT INTO blob.data (hash, location, filename, size, type, subtype)
VALUES (${hash}, ${location}, ${filename}, ${size}, ${type}, ${subtype})
`
}
export async function getBlob (sql: postgres.Sql, blobId: BlobId): Promise<BlobRecordWithFilename | null> {
const { workspace, name } = blobId
const rows = await sql<BlobRecordWithFilename[]>`
SELECT b.workspace, b.name, b.hash, b.location, b.deleted, d.filename
FROM blob.blob AS b
JOIN blob.data AS d ON b.hash = d.hash AND b.location = d.location
WHERE b.workspace = ${workspace} AND b.name = ${name}
`
if (rows.length > 0) {
return rows[0]
}
return null
}
export async function createBlob (sql: postgres.Sql, blob: Omit<BlobRecord, 'filename' | 'deleted'>): Promise<void> {
const { workspace, name, hash, location } = blob
await sql`
UPSERT INTO blob.blob (workspace, name, hash, location, deleted)
VALUES (${workspace}, ${name}, ${hash}, ${location}, false)
`
}
export async function deleteBlob (sql: postgres.Sql, blob: BlobId): Promise<void> {
const { workspace, name } = blob
await sql`
UPDATE blob.blob
SET deleted = true
WHERE workspace = ${workspace} AND name = ${name}
`
}

View File

@ -0,0 +1,37 @@
//
// Copyright © 2024 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { type UUID } from './types'
export const toUUID = (buffer: Uint8Array): UUID => {
const hex = toHex(buffer)
const hex32 = hex.slice(0, 32).padStart(32, '0')
return formatHexAsUUID(hex32)
}
export const toHex = (buffer: Uint8Array): string => {
return Array.from(buffer)
.map((b) => b.toString(16).padStart(2, '0'))
.join('')
}
export const etag = (id: string): string => `"${id}"`
export function formatHexAsUUID (hexString: string): UUID {
if (hexString.length !== 32) {
throw new Error('Hex string must be exactly 32 characters long.')
}
return hexString.replace(/^(.{8})(.{4})(.{4})(.{4})(.{12})$/, '$1-$2-$3-$4-$5') as UUID
}

View File

@ -0,0 +1,50 @@
//
// Copyright © 2024 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { getBlobURL } from './blob'
const prefferedImageFormats = ['webp', 'avif', 'jpeg', 'png']
export async function getImage (
request: Request,
workspace: string,
name: string,
transform: string
): Promise<Response> {
const Accept = request.headers.get('Accept') ?? 'image/*'
const image: Record<string, string> = {}
// select format based on Accept header
const formats = Accept.split(',')
for (const format of formats) {
const [type] = format.split(';')
const [clazz, kind] = type.split('/')
if (clazz === 'image' && prefferedImageFormats.includes(kind)) {
image.format = kind
break
}
}
// apply transforms
transform.split(',').reduce((acc, param) => {
const [key, value] = param.split('=')
acc[key] = value
return acc
}, image)
const blobURL = getBlobURL(request, workspace, name)
const imageRequest = new Request(blobURL, { headers: { Accept } })
return await fetch(imageRequest, { cf: { image, cacheTtl: 3600 } })
}

View File

@ -0,0 +1,73 @@
//
// Copyright © 2024 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { type IRequest, Router, error, html } from 'itty-router'
import {
deleteBlob as handleBlobDelete,
handleBlobGet,
handleBlobHead,
postBlobFormData as handleUploadFormData
} from './blob'
import { cors } from './cors'
import { getImage as handleImageGet } from './image'
import { getVideoMeta as handleVideoMetaGet } from './video'
import { handleSignAbort, handleSignComplete, handleSignCreate } from './sign'
const { preflight, corsify } = cors({
maxAge: 86400
})
export default {
async fetch (request, env, ctx): Promise<Response> {
const router = Router<IRequest>({
before: [preflight],
finally: [corsify]
})
router
.get('/blob/:workspace/:name', ({ params }) => handleBlobGet(request, env, ctx, params.workspace, params.name))
.head('/blob/:workspace/:name', ({ params }) => handleBlobHead(request, env, ctx, params.workspace, params.name))
.delete('/blob/:workspace/:name', ({ params }) => handleBlobDelete(env, params.workspace, params.name))
// Image
.get('/image/:transform/:workspace/:name', ({ params }) =>
handleImageGet(request, params.workspace, params.name, params.transform)
)
// Video
.get('/video/:workspace/:name/meta', ({ params }) =>
handleVideoMetaGet(request, env, ctx, params.workspace, params.name)
)
// Form Data
.post('/upload/form-data/:workspace', ({ params }) => handleUploadFormData(request, env, params.workspace))
// Signed URL
.post('/upload/signed-url/:workspace/:name', ({ params }) =>
handleSignCreate(request, env, ctx, params.workspace, params.name)
)
.put('/upload/signed-url/:workspace/:name', ({ params }) =>
handleSignComplete(request, env, ctx, params.workspace, params.name)
)
.delete('/upload/signed-url/:workspace/:name', ({ params }) =>
handleSignAbort(request, env, ctx, params.workspace, params.name)
)
.all('/', () =>
html(
`Huly&reg; Datalake&trade; <a href="https://huly.io">https://huly.io</a>
&copy; 2024 <a href="https://hulylabs.com">Huly Labs</a>`
)
)
.all('*', () => error(404))
return await router.fetch(request).catch(error)
}
} satisfies ExportedHandler<Env>

View File

@ -0,0 +1,136 @@
//
// Copyright © 2024 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { AwsClient } from 'aws4fetch'
import { error } from 'itty-router'
import { handleBlobUploaded } from './blob'
import { type UUID } from './types'
import { selectStorage, type Storage } from './storage'
const S3_SIGNED_LINK_TTL = 3600
interface SignBlobInfo {
uuid: UUID
}
function signBlobKey (workspace: string, name: string): string {
return `s/${workspace}/${name}`
}
function getS3Client (storage: Storage): AwsClient {
return new AwsClient({
service: 's3',
region: 'auto',
accessKeyId: storage.bucketAccessKey,
secretAccessKey: storage.bucketSecretKey
})
}
export async function handleSignCreate (
request: Request,
env: Env,
ctx: ExecutionContext,
workspace: string,
name: string
): Promise<Response> {
const storage = selectStorage(env, workspace)
const accountId = env.R2_ACCOUNT_ID
const key = signBlobKey(workspace, name)
const uuid = crypto.randomUUID() as UUID
// Generate R2 object link
const url = new URL(`https://${storage.bucketName}.${accountId}.r2.cloudflarestorage.com`)
url.pathname = uuid
url.searchParams.set('X-Amz-Expires', S3_SIGNED_LINK_TTL.toString())
// Sign R2 object link
let signed: Request
try {
const client = getS3Client(storage)
signed = await client.sign(new Request(url, { method: 'PUT' }), { aws: { signQuery: true } })
} catch (err: any) {
console.error({ error: 'failed to generate signed url', message: `${err}` })
return error(500, 'failed to generate signed url')
}
// Save upload details
const s3BlobInfo: SignBlobInfo = { uuid }
await env.datalake_blobs.put(key, JSON.stringify(s3BlobInfo), { expirationTtl: S3_SIGNED_LINK_TTL })
const headers = new Headers({
Expires: new Date(Date.now() + S3_SIGNED_LINK_TTL * 1000).toISOString()
})
return new Response(signed.url, { status: 200, headers })
}
export async function handleSignComplete (
request: Request,
env: Env,
ctx: ExecutionContext,
workspace: string,
name: string
): Promise<Response> {
const { bucket } = selectStorage(env, workspace)
const key = signBlobKey(workspace, name)
// Ensure we generated presigned URL earlier
// TODO what if we came after expiration date?
const signBlobInfo = await env.datalake_blobs.get<SignBlobInfo>(key, { type: 'json' })
if (signBlobInfo === null) {
console.error({ error: 'blob sign info not found', workspace, name })
return error(404)
}
// Ensure the blob has been uploaded
const { uuid } = signBlobInfo
const head = await bucket.get(uuid)
if (head === null) {
console.error({ error: 'blob not found', workspace, name, uuid })
return error(400)
}
try {
await handleBlobUploaded(env, workspace, name, uuid)
} catch (err) {
const message = err instanceof Error ? err.message : String(err)
console.error({ error: message, workspace, name, uuid })
return error(500, 'failed to upload blob')
}
await env.datalake_blobs.delete(key)
return new Response(null, { status: 201 })
}
export async function handleSignAbort (
request: Request,
env: Env,
ctx: ExecutionContext,
workspace: string,
name: string
): Promise<Response> {
const key = signBlobKey(workspace, name)
// Check if the blob has been uploaded
const s3BlobInfo = await env.datalake_blobs.get<SignBlobInfo>(key, { type: 'json' })
if (s3BlobInfo !== null) {
await env.datalake_blobs.delete(key)
}
return new Response(null, { status: 204 })
}

View File

@ -0,0 +1,76 @@
//
// Copyright © 2024 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the 'License');
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an 'AS IS' BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { type Location } from './types'
export interface Storage {
location: Location
bucket: R2Bucket
bucketName: string
bucketAccessKey: string
bucketSecretKey: string
}
export function selectStorage (env: Env, workspace: string): Storage {
const location = selectLocation(env, workspace)
switch (location) {
case 'apac':
return {
location,
bucket: env.DATALAKE_APAC,
bucketName: env.DATALAKE_APAC_BUCKET_NAME,
bucketAccessKey: env.DATALAKE_APAC_ACCESS_KEY,
bucketSecretKey: env.DATALAKE_APAC_SECRET_KEY
}
case 'eeur':
return {
location,
bucket: env.DATALAKE_EEUR,
bucketName: env.DATALAKE_EEUR_BUCKET_NAME,
bucketAccessKey: env.DATALAKE_EEUR_ACCESS_KEY,
bucketSecretKey: env.DATALAKE_EEUR_SECRET_KEY
}
case 'weur':
return {
location,
bucket: env.DATALAKE_WEUR,
bucketName: env.DATALAKE_WEUR_BUCKET_NAME,
bucketAccessKey: env.DATALAKE_WEUR_ACCESS_KEY,
bucketSecretKey: env.DATALAKE_WEUR_SECRET_KEY
}
case 'enam':
return {
location,
bucket: env.DATALAKE_ENAM,
bucketName: env.DATALAKE_ENAM_BUCKET_NAME,
bucketAccessKey: env.DATALAKE_ENAM_ACCESS_KEY,
bucketSecretKey: env.DATALAKE_ENAM_SECRET_KEY
}
case 'wnam':
return {
location,
bucket: env.DATALAKE_WNAM,
bucketName: env.DATALAKE_WNAM_BUCKET_NAME,
bucketAccessKey: env.DATALAKE_WNAM_ACCESS_KEY,
bucketSecretKey: env.DATALAKE_WNAM_SECRET_KEY
}
}
}
function selectLocation (env: Env, workspace: string): Location {
// TODO select location based on workspace
return 'weur'
}

View File

@ -0,0 +1,32 @@
//
// Copyright © 2024 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
export type Location = 'weur' | 'eeur' | 'wnam' | 'enam' | 'apac'
export type UUID = string & { __uuid: true }
export interface CloudflareResponse {
success: boolean
errors: any
messages: any
result: any
}
export interface StreamUploadResponse extends CloudflareResponse {
result: {
uid: string
uploadURL: string
}
}

View File

@ -0,0 +1,121 @@
//
// Copyright © 2024 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { error, json } from 'itty-router'
import { type CloudflareResponse, type StreamUploadResponse } from './types'
export type StreamUploadState = 'ready' | 'error' | 'inprogress' | 'queued' | 'downloading' | 'pendingupload'
// https://developers.cloudflare.com/api/operations/stream-videos-list-videos#response-body
export interface StreamDetailsResponse extends CloudflareResponse {
result: {
uid: string
thumbnail: string
status: {
state: StreamUploadState
}
playback: {
hls: string
dash: string
}
}
}
interface StreamBlobInfo {
streamId: string
}
function streamBlobKey (workspace: string, name: string): string {
return `v/${workspace}/${name}`
}
export async function getVideoMeta (
request: Request,
env: Env,
ctx: ExecutionContext,
workspace: string,
name: string
): Promise<Response> {
const key = streamBlobKey(workspace, name)
const streamInfo = await env.datalake_blobs.get<StreamBlobInfo>(key, { type: 'json' })
if (streamInfo === null) {
return error(404)
}
const url = `https://api.cloudflare.com/client/v4/accounts/${env.STREAMS_ACCOUNT_ID}/stream/${streamInfo.streamId}`
const streamRequest = new Request(url, {
headers: {
Authorization: `Bearer ${env.STREAMS_AUTH_KEY}`,
'Content-Type': 'application/json'
}
})
const streamResponse = await fetch(streamRequest)
const stream = await streamResponse.json<StreamDetailsResponse>()
if (stream.success) {
return json({
status: stream.result.status.state,
thumbnail: stream.result.thumbnail,
hls: stream.result.playback.hls
})
} else {
return error(500, { errors: stream.errors })
}
}
export async function copyVideo (env: Env, source: string, workspace: string, name: string): Promise<void> {
const key = streamBlobKey(workspace, name)
const url = `https://api.cloudflare.com/client/v4/accounts/${env.STREAMS_ACCOUNT_ID}/stream/copy`
const request = new Request(url, {
method: 'POST',
headers: {
Authorization: `Bearer ${env.STREAMS_AUTH_KEY}`,
'Content-Type': 'application/json'
},
body: JSON.stringify({ url: source, meta: { name } })
})
const response = await fetch(request)
const upload = await response.json<StreamUploadResponse>()
if (upload.success) {
const streamInfo: StreamBlobInfo = {
streamId: upload.result.uid
}
await env.datalake_blobs.put(key, JSON.stringify(streamInfo))
}
}
export async function deleteVideo (env: Env, workspace: string, name: string): Promise<void> {
const key = streamBlobKey(workspace, name)
const streamInfo = await env.datalake_blobs.get<StreamBlobInfo>(key, { type: 'json' })
if (streamInfo !== null) {
const url = `https://api.cloudflare.com/client/v4/accounts/${env.STREAMS_ACCOUNT_ID}/stream/${streamInfo.streamId}`
const request = new Request(url, {
method: 'DELETE',
headers: {
Authorization: `Bearer ${env.STREAMS_AUTH_KEY}`,
'Content-Type': 'application/json'
}
})
await Promise.all([fetch(request), env.datalake_blobs.delete(key)])
}
}

View File

@ -0,0 +1,12 @@
{
"extends": "./node_modules/@hcengineering/platform-rig/profiles/default/tsconfig.json",
"compilerOptions": {
"rootDir": "./src",
"outDir": "./lib",
"declarationDir": "./types",
"tsBuildInfoFile": ".build/build.tsbuildinfo",
"types": ["@cloudflare/workers-types", "jest"],
"lib": ["esnext"]
}
}

View File

@ -0,0 +1,30 @@
// Generated by Wrangler on Sat Jul 06 2024 18:52:21 GMT+0200 (Central European Summer Time)
// by running `wrangler types`
interface Env {
datalake_blobs: KVNamespace;
DATALAKE_APAC: R2Bucket;
DATALAKE_EEUR: R2Bucket;
DATALAKE_WEUR: R2Bucket;
DATALAKE_ENAM: R2Bucket;
DATALAKE_WNAM: R2Bucket;
HYPERDRIVE: Hyperdrive;
STREAMS_ACCOUNT_ID: string;
STREAMS_AUTH_KEY: string;
R2_ACCOUNT_ID: string;
DATALAKE_APAC_ACCESS_KEY: string;
DATALAKE_APAC_SECRET_KEY: string;
DATALAKE_APAC_BUCKET_NAME: string;
DATALAKE_EEUR_ACCESS_KEY: string;
DATALAKE_EEUR_SECRET_KEY: string;
DATALAKE_EEUR_BUCKET_NAME: string;
DATALAKE_WEUR_ACCESS_KEY: string;
DATALAKE_WEUR_SECRET_KEY: string;
DATALAKE_WEUR_BUCKET_NAME: string;
DATALAKE_ENAM_ACCESS_KEY: string;
DATALAKE_ENAM_SECRET_KEY: string;
DATALAKE_ENAM_BUCKET_NAME: string;
DATALAKE_WNAM_ACCESS_KEY: string;
DATALAKE_WNAM_SECRET_KEY: string;
DATALAKE_WNAM_BUCKET_NAME: string;
}

View File

@ -0,0 +1,48 @@
#:schema node_modules/wrangler/config-schema.json
name = "datalake-worker"
main = "src/index.ts"
compatibility_date = "2024-07-01"
compatibility_flags = ["nodejs_compat"]
keep_vars = true
kv_namespaces = [
{ binding = "datalake_blobs", id = "64144eb146fd45febc928d44419ebb39", preview_id = "31c6f6e76e7e4524a59f87a4f381de82" }
]
r2_buckets = [
{ binding = "DATALAKE_APAC", bucket_name = "datalake-apac", preview_bucket_name = "dev-datalake-eu-west" },
{ binding = "DATALAKE_EEUR", bucket_name = "datalake-eeur", preview_bucket_name = "dev-datalake-eu-west" },
{ binding = "DATALAKE_WEUR", bucket_name = "datalake-weur", preview_bucket_name = "dev-datalake-eu-west" },
{ binding = "DATALAKE_ENAM", bucket_name = "datalake-enam", preview_bucket_name = "dev-datalake-eu-west" },
{ binding = "DATALAKE_WNAM", bucket_name = "datalake-wnam", preview_bucket_name = "dev-datalake-eu-west" }
]
[[hyperdrive]]
binding = "HYPERDRIVE"
id = "87259c3ae41e41a7b35e610d4282d85a"
localConnectionString = "postgresql://root:roach@localhost:26257/datalake"
[observability]
enabled = true
head_sampling_rate = 1
[vars]
DATALAKE_EEUR_BUCKET_NAME = "datalake-eeur"
# DATALAKE_EEUR_ACCESS_KEY = ""
# DATALAKE_EEUR_SECRET_KEY = ""
DATALAKE_WEUR_BUCKET_NAME = "datalake-weur"
# DATALAKE_WEUR_ACCESS_KEY = ""
# DATALAKE_WEUR_SECRET_KEY = ""
DATALAKE_APAC_BUCKET_NAME = "datalake-apac"
# DATALAKE_APAC_ACCESS_KEY = ""
# DATALAKE_APAC_SECRET_KEY = ""
DATALAKE_ENAM_BUCKET_NAME = "datalake-enam"
# DATALAKE_ENAM_ACCESS_KEY = ""
# DATALAKE_ENAM_SECRET_KEY = ""
DATALAKE_WNAM_BUCKET_NAME = "datalake-wnam"
# DATALAKE_WNAM_ACCESS_KEY = ""
# DATALAKE_WNAM_SECRET_KEY = ""
# STREAMS_ACCOUNT_ID = ""
# STREAMS_AUTH_KEY = ""
# R2_ACCOUNT_ID = ""