diff --git a/common/config/rush/pnpm-lock.yaml b/common/config/rush/pnpm-lock.yaml index aba68922b4..b4d7a40838 100644 --- a/common/config/rush/pnpm-lock.yaml +++ b/common/config/rush/pnpm-lock.yaml @@ -155,6 +155,9 @@ dependencies: '@rush-temp/cloud-branding': specifier: file:./projects/cloud-branding.tgz version: file:projects/cloud-branding.tgz(@types/node@20.11.19)(bufferutil@4.0.8)(esbuild@0.20.1)(ts-node@10.9.2)(utf-8-validate@6.0.4) + '@rush-temp/cloud-datalake': + specifier: file:./projects/cloud-datalake.tgz + version: file:projects/cloud-datalake.tgz(@types/node@20.11.19)(bufferutil@4.0.8)(esbuild@0.20.1)(ts-node@10.9.2)(utf-8-validate@6.0.4) '@rush-temp/collaboration': specifier: file:./projects/collaboration.tgz version: file:projects/collaboration.tgz(esbuild@0.20.1)(ts-node@10.9.2) @@ -1394,6 +1397,9 @@ dependencies: aws-sdk: specifier: ^2.1423.0 version: 2.1664.0 + aws4fetch: + specifier: ^1.0.20 + version: 1.0.20 base64-js: specifier: ^1.5.1 version: 1.5.1 @@ -1748,6 +1754,9 @@ dependencies: postcss-loader: specifier: ^7.0.2 version: 7.3.4(postcss@8.4.35)(typescript@5.3.3)(webpack@5.90.3) + postgres: + specifier: ^3.4.4 + version: 3.4.4 posthog-js: specifier: ~1.122.0 version: 1.122.0 @@ -11374,6 +11383,10 @@ packages: xml2js: 0.6.2 dev: false + /aws4fetch@1.0.20: + resolution: {integrity: sha512-/djoAN709iY65ETD6LKCtyyEI04XIBP5xVvfmNxsEP0uJB5tyaGBztSryRr4HqMStr9R06PisQE7m9zDTXKu6g==} + dev: false + /axobject-query@4.0.0: resolution: {integrity: sha512-+60uv1hiVFhHZeO+Lz0RYzsVHy5Wr1ayX0mwda9KPDVLNJgZ1T9Ny7VmFbLDzxsH0D87I86vgj3gFrjTJUYznw==} dependencies: @@ -11548,6 +11561,13 @@ packages: dev: false optional: true + /base32-encode@2.0.0: + resolution: {integrity: sha512-mlmkfc2WqdDtMl/id4qm3A7RjW6jxcbAoMjdRmsPiwQP0ufD4oXItYMnPgVHe80lnAIy+1xwzhHE1s4FoIceSw==} + engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0} + dependencies: + to-data-view: 2.0.0 + dev: false + /base64-js@1.5.1: resolution: {integrity: sha512-AKpaYlHn8t4SVbOHCy+b5+KKgvR4vrsD8vbvrbiQJps7fKDTkjkDry6ji0rUJjC0kzbNePLwzxq8iypo41qeWA==} dev: false @@ -20790,6 +20810,11 @@ packages: resolution: {integrity: sha512-i/hbxIE9803Alj/6ytL7UHQxRvZkI9O4Sy+J3HGc4F4oo/2eQAjTSNJ0bfxyse3bH0nuVesCk+3IRLaMtG3H6w==} dev: false + /postgres@3.4.4: + resolution: {integrity: sha512-IbyN+9KslkqcXa8AO9fxpk97PA4pzewvpi2B3Dwy9u4zpV32QicaEdgmF3eSQUzdRk7ttDHQejNgAEr4XoeH4A==} + engines: {node: '>=12'} + dev: false + /posthog-js@1.122.0: resolution: {integrity: sha512-+8R2/nLaWyI5Jp2Ly7L52qcgDFU3xryyoNG52DPJ8dlGnagphxIc0mLNGurgyKeeTGycsOsuOIP4dtofv3ZoBA==} deprecated: This version of posthog-js is deprecated, please update posthog-js, and do not use this version! Check out our JS docs at https://posthog.com/docs/libraries/js @@ -23505,6 +23530,11 @@ packages: resolution: {integrity: sha512-3f0uOEAQwIqGuWW2MVzYg8fV/QNnc/IpuJNG837rLuczAaLVHslWHZQj4IGiEl5Hs3kkbhwL9Ab7Hrsmuj+Smw==} dev: false + /to-data-view@2.0.0: + resolution: {integrity: sha512-RGEM5KqlPHr+WVTPmGNAXNeFEmsBnlkxXaIfEpUYV0AST2Z5W1EGq9L/MENFrMMmL2WQr1wjkmZy/M92eKhjYA==} + engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0} + dev: false + /to-fast-properties@2.0.0: resolution: {integrity: sha512-/OaKK0xYrs3DmxRYqL/yDc+FxFUVYhDlXMhRmv3z915w2HF1tnN1omB354j8VUGO/hbRzyD6Y3sA7v7GS/ceog==} engines: {node: '>=4'} @@ -26627,6 +26657,44 @@ packages: - utf-8-validate dev: false + file:projects/cloud-datalake.tgz(@types/node@20.11.19)(bufferutil@4.0.8)(esbuild@0.20.1)(ts-node@10.9.2)(utf-8-validate@6.0.4): + resolution: {integrity: sha512-AA2lTsmPKPeYA1MTwIscZFRO40m9Ctc59Er2x8VRLNBBt4mQ01b1CCay4VFVPWYxAzh+Ru9RoUIB7lS+m8sj9Q==, tarball: file:projects/cloud-datalake.tgz} + id: file:projects/cloud-datalake.tgz + name: '@rush-temp/cloud-datalake' + version: 0.0.0 + dependencies: + '@cloudflare/workers-types': 4.20241004.0 + '@types/jest': 29.5.12 + '@typescript-eslint/eslint-plugin': 6.21.0(@typescript-eslint/parser@6.21.0)(eslint@8.56.0)(typescript@5.6.2) + '@typescript-eslint/parser': 6.21.0(eslint@8.56.0)(typescript@5.6.2) + aws4fetch: 1.0.20 + base32-encode: 2.0.0 + eslint: 8.56.0 + eslint-config-standard-with-typescript: 40.0.0(@typescript-eslint/eslint-plugin@6.21.0)(eslint-plugin-import@2.29.1)(eslint-plugin-n@15.7.0)(eslint-plugin-promise@6.1.1)(eslint@8.56.0)(typescript@5.6.2) + eslint-plugin-import: 2.29.1(eslint@8.56.0) + eslint-plugin-n: 15.7.0(eslint@8.56.0) + eslint-plugin-promise: 6.1.1(eslint@8.56.0) + itty-router: 5.0.18 + jest: 29.7.0(@types/node@20.11.19)(ts-node@10.9.2) + postgres: 3.4.4 + prettier: 3.2.5 + ts-jest: 29.1.2(esbuild@0.20.1)(jest@29.7.0)(typescript@5.6.2) + typescript: 5.6.2 + wrangler: 3.80.2(@cloudflare/workers-types@4.20241004.0)(bufferutil@4.0.8)(utf-8-validate@6.0.4) + transitivePeerDependencies: + - '@babel/core' + - '@jest/types' + - '@types/node' + - babel-jest + - babel-plugin-macros + - bufferutil + - esbuild + - node-notifier + - supports-color + - ts-node + - utf-8-validate + dev: false + file:projects/collaboration.tgz(esbuild@0.20.1)(ts-node@10.9.2): resolution: {integrity: sha512-krhgq1XiDnWKIP/HUM8VQgEzXdxLNfDf68lZgDl/Yl2tEFUu8yLYpzd1qWVMkl8N0dXyGts+DEFC7Ntns48lgA==, tarball: file:projects/collaboration.tgz} id: file:projects/collaboration.tgz @@ -27018,6 +27086,7 @@ packages: name: '@rush-temp/datalake' version: 0.0.0 dependencies: + '@aws-sdk/client-s3': 3.577.0 '@types/jest': 29.5.12 '@types/node': 20.11.19 '@types/node-fetch': 2.6.11 diff --git a/desktop/src/ui/platform.ts b/desktop/src/ui/platform.ts index 359874b35f..0fb19be3e6 100644 --- a/desktop/src/ui/platform.ts +++ b/desktop/src/ui/platform.ts @@ -96,7 +96,7 @@ import '@hcengineering/analytics-collector-assets' import '@hcengineering/text-editor-assets' import { coreId } from '@hcengineering/core' -import presentation, { parsePreviewConfig, presentationId } from '@hcengineering/presentation' +import presentation, { parsePreviewConfig, parseUploadConfig, presentationId } from '@hcengineering/presentation' import textEditor, { textEditorId } from '@hcengineering/text-editor' import love, { loveId } from '@hcengineering/love' import print, { printId } from '@hcengineering/print' @@ -205,6 +205,7 @@ export async function configurePlatform (): Promise { setMetadata(presentation.metadata.FilesURL, config.FILES_URL) setMetadata(presentation.metadata.CollaboratorUrl, config.COLLABORATOR_URL) setMetadata(presentation.metadata.PreviewConfig, parsePreviewConfig(config.PREVIEW_CONFIG)) + setMetadata(presentation.metadata.UploadConfig, parseUploadConfig(config.UPLOAD_CONFIG, config.UPLOAD_URL)) setMetadata(presentation.metadata.FrontUrl, config.FRONT_URL) setMetadata(textEditor.metadata.Collaborator, config.COLLABORATOR ?? '') diff --git a/desktop/src/ui/types.ts b/desktop/src/ui/types.ts index 55e8e39d66..6e9c37e474 100644 --- a/desktop/src/ui/types.ts +++ b/desktop/src/ui/types.ts @@ -30,6 +30,7 @@ export interface Config { AI_URL?:string BRANDING_URL?: string PREVIEW_CONFIG: string + UPLOAD_CONFIG: string DESKTOP_UPDATES_URL?: string DESKTOP_UPDATES_CHANNEL?: string TELEGRAM_BOT_URL?: string diff --git a/dev/prod/src/platform.ts b/dev/prod/src/platform.ts index a43cf316fc..68031ac2e4 100644 --- a/dev/prod/src/platform.ts +++ b/dev/prod/src/platform.ts @@ -108,7 +108,12 @@ import github, { githubId } from '@hcengineering/github' import '@hcengineering/github-assets' import { coreId } from '@hcengineering/core' -import presentation, { loadServerConfig, parsePreviewConfig, presentationId } from '@hcengineering/presentation' +import presentation, { + loadServerConfig, + parsePreviewConfig, + parseUploadConfig, + presentationId +} from '@hcengineering/presentation' import { setMetadata } from '@hcengineering/platform' import { setDefaultLanguage, initThemeStore } from '@hcengineering/theme' @@ -150,6 +155,7 @@ export interface Config { // Could be defined for dev environment FRONT_URL?: string PREVIEW_CONFIG?: string + UPLOAD_CONFIG?: string } export interface Branding { @@ -292,6 +298,7 @@ export async function configurePlatform() { setMetadata(presentation.metadata.FrontUrl, config.FRONT_URL) setMetadata(presentation.metadata.PreviewConfig, parsePreviewConfig(config.PREVIEW_CONFIG)) + setMetadata(presentation.metadata.UploadConfig, parseUploadConfig(config.UPLOAD_CONFIG, config.UPLOAD_URL)) setMetadata(textEditor.metadata.Collaborator, config.COLLABORATOR) diff --git a/packages/presentation/src/file.ts b/packages/presentation/src/file.ts index 84e435ec17..5d1fa55756 100644 --- a/packages/presentation/src/file.ts +++ b/packages/presentation/src/file.ts @@ -13,12 +13,30 @@ // limitations under the License. // -import { concatLink, type Blob, type Ref } from '@hcengineering/core' +import { concatLink, type Blob as PlatformBlob, type Ref } from '@hcengineering/core' import { PlatformError, Severity, Status, getMetadata } from '@hcengineering/platform' import { v4 as uuid } from 'uuid' import plugin from './plugin' +export type FileUploadMethod = 'form-data' | 'signed-url' + +export interface UploadConfig { + 'form-data': { + url: string + } + 'signed-url'?: { + url: string + size: number + } +} + +export interface FileUploadParams { + method: FileUploadMethod + url: string + headers: Record +} + interface FileUploadError { key: string error: string @@ -34,6 +52,46 @@ type FileUploadResult = FileUploadSuccess | FileUploadError const defaultUploadUrl = '/files' const defaultFilesUrl = '/files/:workspace/:filename?file=:blobId&workspace=:workspace' +function parseInt (value: string, fallback: number): number { + const number = Number.parseInt(value) + return Number.isInteger(number) ? number : fallback +} + +export function parseUploadConfig (config: string, uploadUrl: string): UploadConfig { + const uploadConfig: UploadConfig = { + 'form-data': { url: uploadUrl }, + 'signed-url': undefined + } + + if (config !== undefined) { + const configs = config.split(';') + for (const c of configs) { + if (c === '') { + continue + } + + const [key, size, url] = c.split('|') + + if (url === undefined || url === '') { + throw new Error(`Bad upload config: ${c}`) + } + + if (key === 'form-data') { + uploadConfig['form-data'] = { url } + } else if (key === 'signed-url') { + uploadConfig['signed-url'] = { + url, + size: parseInt(size, 0) * 1024 * 1024 + } + } else { + throw new Error(`Unknown upload config key: ${key}`) + } + } + } + + return uploadConfig +} + function getFilesUrl (): string { const filesUrl = getMetadata(plugin.metadata.FilesURL) ?? defaultFilesUrl const frontUrl = getMetadata(plugin.metadata.FrontUrl) ?? window.location.origin @@ -61,6 +119,42 @@ export function getUploadUrl (): string { return template.replaceAll(':workspace', encodeURIComponent(getCurrentWorkspaceId())) } +function getUploadConfig (): UploadConfig { + return getMetadata(plugin.metadata.UploadConfig) ?? { 'form-data': { url: getUploadUrl() } } +} + +function getFileUploadMethod (blob: Blob): { method: FileUploadMethod, url: string } { + const config = getUploadConfig() + + const signedUrl = config['signed-url'] + if (signedUrl !== undefined && signedUrl.size < blob.size) { + return { method: 'signed-url', url: signedUrl.url } + } + + return { method: 'form-data', url: config['form-data'].url } +} + +/** + * @public + */ +export function getFileUploadParams (blobId: string, blob: Blob): FileUploadParams { + const workspaceId = encodeURIComponent(getCurrentWorkspaceId()) + const fileId = encodeURIComponent(blobId) + + const { method, url: urlTemplate } = getFileUploadMethod(blob) + + const url = urlTemplate.replaceAll(':workspace', workspaceId).replaceAll(':blobId', fileId) + + const headers: Record = + method !== 'signed-url' + ? { + Authorization: 'Bearer ' + (getMetadata(plugin.metadata.Token) as string) + } + : {} + + return { method, url, headers } +} + /** * @public */ @@ -79,12 +173,40 @@ export function getFileUrl (file: string, filename?: string): string { /** * @public */ -export async function uploadFile (file: File): Promise> { - const uploadUrl = getUploadUrl() - +export async function uploadFile (file: File): Promise> { const id = generateFileId() + const params = getFileUploadParams(id, file) + + if (params.method === 'signed-url') { + await uploadFileWithSignedUrl(file, id, params.url) + } else { + await uploadFileWithFormData(file, id, params.url) + } + + return id as Ref +} + +/** + * @public + */ +export async function deleteFile (id: string): Promise { + const fileUrl = getFileUrl(id) + + const resp = await fetch(fileUrl, { + method: 'DELETE', + headers: { + Authorization: 'Bearer ' + (getMetadata(plugin.metadata.Token) as string) + } + }) + + if (resp.status !== 200) { + throw new Error('Failed to delete file') + } +} + +async function uploadFileWithFormData (file: File, uuid: string, uploadUrl: string): Promise { const data = new FormData() - data.append('file', file, id) + data.append('file', file, uuid) const resp = await fetch(uploadUrl, { method: 'POST', @@ -110,24 +232,54 @@ export async function uploadFile (file: File): Promise> { if ('error' in result[0]) { throw Error(`Failed to upload file: ${result[0].error}`) } - - return id as Ref } -/** - * @public - */ -export async function deleteFile (id: string): Promise { - const fileUrl = getFileUrl(id) - - const resp = await fetch(fileUrl, { - method: 'DELETE', +async function uploadFileWithSignedUrl (file: File, uuid: string, uploadUrl: string): Promise { + const response = await fetch(uploadUrl, { + method: 'POST', headers: { Authorization: 'Bearer ' + (getMetadata(plugin.metadata.Token) as string) } }) - if (resp.status !== 200) { - throw new Error('Failed to delete file') + if (response.ok) { + throw Error(`Failed to genearte signed upload URL: ${response.statusText}`) + } + + const signedUrl = await response.text() + if (signedUrl === undefined || signedUrl === '') { + throw Error('Missing signed upload URL') + } + + try { + const response = await fetch(signedUrl, { + body: file, + method: 'PUT', + headers: { + 'Content-Type': file.type, + 'Content-Length': file.size.toString(), + 'x-amz-meta-last-modified': file.lastModified.toString() + } + }) + + if (!response.ok) { + throw Error(`Failed to upload file: ${response.statusText}`) + } + + // confirm we uploaded file + await fetch(uploadUrl, { + method: 'PUT', + headers: { + Authorization: 'Bearer ' + (getMetadata(plugin.metadata.Token) as string) + } + }) + } catch (err) { + // abort the upload + await fetch(uploadUrl, { + method: 'DELETE', + headers: { + Authorization: 'Bearer ' + (getMetadata(plugin.metadata.Token) as string) + } + }) } } diff --git a/packages/presentation/src/plugin.ts b/packages/presentation/src/plugin.ts index 067ccd235a..0f826224c4 100644 --- a/packages/presentation/src/plugin.ts +++ b/packages/presentation/src/plugin.ts @@ -43,6 +43,7 @@ import { type InstantTransactions, type ObjectSearchCategory } from './types' +import { type UploadConfig } from './file' /** * @public @@ -138,6 +139,7 @@ export default plugin(presentationId, { Workspace: '' as Metadata, WorkspaceId: '' as Metadata, FrontUrl: '' as Asset, + UploadConfig: '' as Metadata, PreviewConfig: '' as Metadata, ClientHook: '' as Metadata, SessionId: '' as Metadata diff --git a/plugins/attachment-resources/src/components/AttachmentStyleBoxCollabEditor.svelte b/plugins/attachment-resources/src/components/AttachmentStyleBoxCollabEditor.svelte index a65b770905..84e415dff8 100644 --- a/plugins/attachment-resources/src/components/AttachmentStyleBoxCollabEditor.svelte +++ b/plugins/attachment-resources/src/components/AttachmentStyleBoxCollabEditor.svelte @@ -35,7 +35,7 @@ getModelRefActions } from '@hcengineering/text-editor-resources' import { AnySvelteComponent, getEventPositionElement, getPopupPositionElement, navigate } from '@hcengineering/ui' - import { uploadFiles } from '@hcengineering/uploader' + import { type FileUploadCallbackParams, uploadFiles } from '@hcengineering/uploader' import view from '@hcengineering/view' import { getCollaborationUser, getObjectId, getObjectLinkFragment } from '@hcengineering/view-resources' import { Analytics } from '@hcengineering/analytics' @@ -135,14 +135,7 @@ progress = true - await uploadFiles( - list, - { objectId: object._id, objectClass: object._class }, - {}, - async (uuid, name, file, path, metadata) => { - await createAttachment(uuid, name, file, metadata) - } - ) + await uploadFiles(list, { onFileUploaded }) inputFile.value = '' progress = false @@ -151,14 +144,7 @@ async function attachFiles (files: File[] | FileList): Promise { progress = true if (files.length > 0) { - await uploadFiles( - files, - { objectId: object._id, objectClass: object._class }, - {}, - async (uuid, name, file, path, metadata) => { - await createAttachment(uuid, name, file, metadata) - } - ) + await uploadFiles(files, { onFileUploaded }) } progress = false } @@ -174,6 +160,10 @@ } } + async function onFileUploaded ({ uuid, name, file, metadata }: FileUploadCallbackParams): Promise { + await createAttachment(uuid, name, file, metadata) + } + async function createAttachment ( uuid: Ref, name: string, diff --git a/plugins/attachment-resources/src/components/AttachmentStyledBox.svelte b/plugins/attachment-resources/src/components/AttachmentStyledBox.svelte index 2f16f8ad50..f8e7c0fe7b 100644 --- a/plugins/attachment-resources/src/components/AttachmentStyledBox.svelte +++ b/plugins/attachment-resources/src/components/AttachmentStyledBox.svelte @@ -13,7 +13,7 @@ // limitations under the License. --> {#if object && version} diff --git a/plugins/drive-resources/src/utils.ts b/plugins/drive-resources/src/utils.ts index 52473b0281..38d87e1ef2 100644 --- a/plugins/drive-resources/src/utils.ts +++ b/plugins/drive-resources/src/utils.ts @@ -177,7 +177,14 @@ export async function uploadFilesToDrive (dt: DataTransfer, space: Ref, p ? { objectId: parent, objectClass: drive.class.Folder } : { objectId: space, objectClass: drive.class.Drive } - await uploadFiles(files, target, {}, onFileUploaded) + const options = { + onFileUploaded, + showProgress: { + target + } + } + + await uploadFiles(files, options) } export async function uploadFilesToDrivePopup (space: Ref, parent: Ref): Promise { @@ -189,12 +196,15 @@ export async function uploadFilesToDrivePopup (space: Ref, parent: Ref, parent: Ref): Prom return current } - const callback: FileUploadCallback = async (uuid, name, file, path, metadata) => { + const callback: FileUploadCallback = async ({ uuid, name, file, path, metadata }) => { const folder = await findParent(path) try { const data = { diff --git a/plugins/uploader-resources/src/types.ts b/plugins/uploader-resources/src/types.ts new file mode 100644 index 0000000000..5997123f75 --- /dev/null +++ b/plugins/uploader-resources/src/types.ts @@ -0,0 +1,28 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// +import type { IndexedObject } from '@uppy/core' + +// For Uppy 4.0 compatibility +export type Meta = IndexedObject +export type Body = IndexedObject + +/** @public */ +export type UppyMeta = Meta & { + uuid: string + relativePath?: string +} + +/** @public */ +export type UppyBody = Body diff --git a/plugins/uploader-resources/src/uppy.ts b/plugins/uploader-resources/src/uppy.ts index 84a7316e2d..fe31aa2733 100644 --- a/plugins/uploader-resources/src/uppy.ts +++ b/plugins/uploader-resources/src/uppy.ts @@ -14,12 +14,18 @@ // import { type Blob, type Ref, generateId } from '@hcengineering/core' -import { getMetadata } from '@hcengineering/platform' -import presentation, { generateFileId, getFileMetadata, getUploadUrl } from '@hcengineering/presentation' +import { getMetadata, PlatformError, unknownError } from '@hcengineering/platform' +import presentation, { + type FileUploadParams, + generateFileId, + getFileMetadata, + getFileUploadParams, + getUploadUrl +} from '@hcengineering/presentation' import { getCurrentLanguage } from '@hcengineering/theme' -import type { FileUploadCallback, FileUploadOptions } from '@hcengineering/uploader' +import type { FileUploadOptions } from '@hcengineering/uploader' -import Uppy, { type IndexedObject, type UppyOptions } from '@uppy/core' +import Uppy, { type UppyFile, type UppyOptions } from '@uppy/core' import XHR from '@uppy/xhr-upload' import En from '@uppy/locales/lib/en_US' @@ -29,6 +35,8 @@ import Pt from '@uppy/locales/lib/pt_PT' import Ru from '@uppy/locales/lib/ru_RU' import Zh from '@uppy/locales/lib/zh_CN' +import type { UppyBody, UppyMeta } from './types' + type Locale = UppyOptions['locale'] const locales: Record = { @@ -44,22 +52,65 @@ function getUppyLocale (lang: string): Locale { return locales[lang] ?? En } -// For Uppy 4.0 compatibility -type Meta = IndexedObject -type Body = IndexedObject +interface XHRFileProcessor { + name: string + onBeforeUpload: (uppy: Uppy, file: UppyFile, params: FileUploadParams) => Promise + onAfterUpload: (uppy: Uppy, file: UppyFile, params: FileUploadParams) => Promise +} -/** @public */ -export type UppyMeta = Meta & { - relativePath?: string +const FormDataFileProcessor: XHRFileProcessor = { + name: 'form-data', + + onBeforeUpload: async (uppy: Uppy, file: UppyFile, { url, headers }: FileUploadParams): Promise => { + const xhrUpload = 'xhrUpload' in file && typeof file.xhrUpload === 'object' ? file.xhrUpload : {} + const state = { + xhrUpload: { + ...xhrUpload, + endpoint: url, + method: 'POST', + formData: true, + headers + } + } + uppy.setFileState(file.id, state) + }, + + onAfterUpload: async (uppy: Uppy, file: UppyFile, params: FileUploadParams): Promise => {} +} + +const SignedURLFileProcessor: XHRFileProcessor = { + name: 'signed-url', + + onBeforeUpload: async (uppy: Uppy, file: UppyFile, { url, headers }: FileUploadParams): Promise => { + const xhrUpload = 'xhrUpload' in file && typeof file.xhrUpload === 'object' ? file.xhrUpload : {} + const signedUrl = await getSignedUploadUrl(file, url) + const state = { + xhrUpload: { + ...xhrUpload, + formData: false, + method: 'PUT', + endpoint: signedUrl, + headers: { + ...headers, + // S3 direct upload does not require authorization + Authorization: '', + 'Content-Type': file.type + } + } + } + uppy.setFileState(file.id, state) + }, + + onAfterUpload: async (uppy: Uppy, file: UppyFile, params: FileUploadParams): Promise => { + const error = 'error' in file && file.error != null + await fetch(params.url, { method: error ? 'DELETE' : 'PUT' }) + } } /** @public */ -export type UppyBody = Body & { - uuid: string -} +export function getUppy (options: FileUploadOptions): Uppy { + const { onFileUploaded } = options -/** @public */ -export function getUppy (options: FileUploadOptions, onFileUploaded?: FileUploadCallback): Uppy { const uppyOptions: Partial = { id: generateId(), locale: getUppyLocale(getCurrentLanguage()), @@ -71,31 +122,14 @@ export function getUppy (options: FileUploadOptions, onFileUploaded?: FileUpload } } - const uppy = new Uppy(uppyOptions).use(XHR, { - endpoint: getUploadUrl(), - method: 'POST', - headers: { - Authorization: 'Bearer ' + (getMetadata(presentation.metadata.Token) as string) - }, - getResponseError: (_, response) => { - return new Error((response as Response).statusText) - } - }) - - // Hack to setup shouldRetry callback on xhrUpload that is not exposed in options - const xhrUpload = uppy.getState().xhrUpload ?? {} - uppy.getState().xhrUpload = { - ...xhrUpload, - shouldRetry: (response: Response) => response.status !== 413 - } + const uppy = new Uppy(uppyOptions) + // Ensure we always have UUID uppy.addPreProcessor(async (fileIds: string[]) => { for (const fileId of fileIds) { const file = uppy.getFile(fileId) - if (file != null) { - const uuid = generateFileId() - file.meta.uuid = uuid - file.meta.name = uuid + if (file != null && file.meta.uuid === undefined) { + uppy.setFileMeta(fileId, { uuid: generateFileId() }) } } }) @@ -111,11 +145,78 @@ export function getUppy (options: FileUploadOptions, onFileUploaded?: FileUpload const uuid = file.meta.uuid as Ref if (uuid !== undefined) { const metadata = await getFileMetadata(file.data, uuid) - await onFileUploaded(uuid, file.name, file.data, file.meta.relativePath, metadata) + await onFileUploaded({ + uuid, + name: file.name, + file: file.data, + path: file.meta.relativePath, + metadata + }) + } else { + console.warn('missing file metadata uuid', file) } } }) } + configureXHR(uppy) + return uppy } + +function configureXHR (uppy: Uppy): Uppy { + uppy.use(XHR, { + endpoint: getUploadUrl(), + method: 'POST', + headers: { + Authorization: 'Bearer ' + (getMetadata(presentation.metadata.Token) as string) + }, + getResponseError: (_, response) => { + return new Error((response as Response).statusText) + } + }) + + // Hack to setup shouldRetry callback on xhrUpload that is not exposed in options + const xhrUpload = uppy.getState().xhrUpload ?? {} + uppy.getState().xhrUpload = { + ...xhrUpload, + shouldRetry: (response: Response) => !(response.status in [401, 403, 413]) + } + + uppy.addPreProcessor(async (fileIds: string[]) => { + for (const fileId of fileIds) { + const file = uppy.getFile(fileId) + if (file != null) { + const params = getFileUploadParams(file.meta.uuid, file.data) + const processor = getXHRProcessor(file, params) + await processor.onBeforeUpload(uppy, file, params) + } + } + }) + + uppy.addPostProcessor(async (fileIds: string[]) => { + for (const fileId of fileIds) { + const file = uppy.getFile(fileId) + if (file != null) { + const params = getFileUploadParams(file.meta.uuid, file.data) + const processor = getXHRProcessor(file, params) + await processor.onAfterUpload(uppy, file, params) + } + } + }) + + return uppy +} + +function getXHRProcessor (file: UppyFile, params: FileUploadParams): XHRFileProcessor { + return params.method === 'form-data' ? FormDataFileProcessor : SignedURLFileProcessor +} + +async function getSignedUploadUrl (file: UppyFile, signUrl: string): Promise { + const response = await fetch(signUrl, { method: 'POST' }) + if (!response.ok) { + throw new PlatformError(unknownError('Failed to get signed upload url')) + } + + return await response.text() +} diff --git a/plugins/uploader-resources/src/utils.ts b/plugins/uploader-resources/src/utils.ts index 41c625aaea..d6c667e714 100644 --- a/plugins/uploader-resources/src/utils.ts +++ b/plugins/uploader-resources/src/utils.ts @@ -14,54 +14,45 @@ // import { showPopup } from '@hcengineering/ui' -import { - type FileUploadCallback, - type FileUploadOptions, - type FileUploadPopupOptions, - type FileUploadTarget, - toFileWithPath -} from '@hcengineering/uploader' +import { type FileUploadOptions, type FileUploadPopupOptions, toFileWithPath } from '@hcengineering/uploader' import FileUploadPopup from './components/FileUploadPopup.svelte' import { dockFileUpload } from './store' import { getUppy } from './uppy' +import { generateFileId } from '@hcengineering/presentation' /** @public */ export async function showFilesUploadPopup ( - target: FileUploadTarget, options: FileUploadOptions, - popupOptions: FileUploadPopupOptions, - onFileUploaded: FileUploadCallback + popupOptions: FileUploadPopupOptions ): Promise { - const uppy = getUppy(options, onFileUploaded) + const uppy = getUppy(options) - showPopup(FileUploadPopup, { uppy, target, options: popupOptions }, undefined, (res) => { - if (res === true && options.hideProgress !== true) { + showPopup(FileUploadPopup, { uppy, options: popupOptions }, undefined, (res) => { + if (res === true && options.showProgress !== undefined) { + const { target } = options.showProgress dockFileUpload(target, uppy) } }) } /** @public */ -export async function uploadFiles ( - files: File[] | FileList, - target: FileUploadTarget, - options: FileUploadOptions, - onFileUploaded: FileUploadCallback -): Promise { +export async function uploadFiles (files: File[] | FileList, options: FileUploadOptions): Promise { const items = Array.from(files, (p) => toFileWithPath(p)) if (items.length === 0) return - const uppy = getUppy(options, onFileUploaded) + const uppy = getUppy(options) for (const data of items) { const { name, type, relativePath } = data - uppy.addFile({ name, type, data, meta: { relativePath } }) + const uuid = generateFileId() + uppy.addFile({ name, type, data, meta: { name: uuid, uuid, relativePath } }) } - if (options.hideProgress !== true) { + if (options.showProgress !== undefined) { + const { target } = options.showProgress dockFileUpload(target, uppy) } diff --git a/plugins/uploader/src/types.ts b/plugins/uploader/src/types.ts index 6d81bec926..b2315c8f3e 100644 --- a/plugins/uploader/src/types.ts +++ b/plugins/uploader/src/types.ts @@ -21,20 +21,10 @@ export interface FileWithPath extends File { } /** @public */ -export type UploadFilesPopupFn = ( - target: FileUploadTarget, - options: FileUploadOptions, - popupOptions: FileUploadPopupOptions, - onFileUploaded: FileUploadCallback -) => Promise +export type UploadFilesPopupFn = (options: FileUploadOptions, popupOptions: FileUploadPopupOptions) => Promise /** @public */ -export type UploadFilesFn = ( - files: File[] | FileList, - target: FileUploadTarget, - options: FileUploadOptions, - onFileUploaded: FileUploadCallback -) => Promise +export type UploadFilesFn = (files: File[] | FileList, options: FileUploadOptions) => Promise /** @public */ export interface FileUploadTarget { @@ -42,12 +32,20 @@ export interface FileUploadTarget { objectClass: Ref> } +/** @public */ +export interface FileUploadProgressOptions { + target: FileUploadTarget +} + /** @public */ export interface FileUploadOptions { + // Uppy options maxFileSize?: number maxNumberOfFiles?: number allowedFileTypes?: string[] | null - hideProgress?: boolean + + onFileUploaded?: FileUploadCallback + showProgress?: FileUploadProgressOptions } /** @public */ @@ -56,10 +54,13 @@ export interface FileUploadPopupOptions { } /** @public */ -export type FileUploadCallback = ( - uuid: Ref, - name: string, - file: FileWithPath | Blob, - path: string | undefined, +export interface FileUploadCallbackParams { + uuid: Ref + name: string + file: FileWithPath | Blob + path: string | undefined metadata: Record | undefined -) => Promise +} + +/** @public */ +export type FileUploadCallback = (params: FileUploadCallbackParams) => Promise diff --git a/plugins/uploader/src/utils.ts b/plugins/uploader/src/utils.ts index cadbf0b618..9199b5f40b 100644 --- a/plugins/uploader/src/utils.ts +++ b/plugins/uploader/src/utils.ts @@ -16,34 +16,27 @@ import { getResource } from '@hcengineering/platform' import uploader from './plugin' -import type { - FileUploadCallback, - FileUploadOptions, - FileUploadPopupOptions, - FileUploadTarget, - FileWithPath -} from './types' +import type { FileUploadOptions, FileUploadPopupOptions, FileWithPath } from './types' /** @public */ export async function showFilesUploadPopup ( - target: FileUploadTarget, options: FileUploadOptions, - popupOptions: FileUploadPopupOptions, - onFileUploaded: FileUploadCallback + popupOptions: FileUploadPopupOptions ): Promise { const fn = await getResource(uploader.function.ShowFilesUploadPopup) - await fn(target, options, popupOptions, onFileUploaded) + await fn(options, popupOptions) } /** @public */ -export async function uploadFiles ( - files: File[] | FileList, - target: FileUploadTarget, - options: FileUploadOptions, - onFileUploaded: FileUploadCallback -): Promise { +export async function uploadFile (file: File, options: FileUploadOptions): Promise { const fn = await getResource(uploader.function.UploadFiles) - await fn(files, target, options, onFileUploaded) + await fn([file], options) +} + +/** @public */ +export async function uploadFiles (files: File[] | FileList, options: FileUploadOptions): Promise { + const fn = await getResource(uploader.function.UploadFiles) + await fn(files, options) } /** @public */ diff --git a/rush.json b/rush.json index e73a75304b..34ddb321b7 100644 --- a/rush.json +++ b/rush.json @@ -2115,6 +2115,11 @@ "packageName": "@hcengineering/cloud-branding", "projectFolder": "workers/branding", "shouldPublish": false + }, + { + "packageName": "@hcengineering/cloud-datalake", + "projectFolder": "workers/datalake", + "shouldPublish": false } ] } diff --git a/server/datalake/src/client.ts b/server/datalake/src/client.ts index 5571afce5a..6990cb60f9 100644 --- a/server/datalake/src/client.ts +++ b/server/datalake/src/client.ts @@ -15,7 +15,7 @@ import { type MeasureContext, type WorkspaceId, concatLink } from '@hcengineering/core' import FormData from 'form-data' -import fetch from 'node-fetch' +import fetch, { type RequestInit, type Response } from 'node-fetch' import { Readable } from 'stream' /** @public */ @@ -34,11 +34,6 @@ export interface StatObjectOutput { size?: number } -/** @public */ -export interface PutObjectOutput { - id: string -} - interface BlobUploadError { key: string error: string @@ -54,7 +49,11 @@ type BlobUploadResult = BlobUploadSuccess | BlobUploadError /** @public */ export class Client { - constructor (private readonly endpoint: string) {} + private readonly endpoint: string + + constructor (host: string, port?: number) { + this.endpoint = port !== undefined ? `${host}:${port}` : host + } getObjectUrl (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): string { const path = `/blob/${workspace.name}/${encodeURIComponent(objectName)}` @@ -63,21 +62,7 @@ export class Client { async getObject (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise { const url = this.getObjectUrl(ctx, workspace, objectName) - - let response - try { - response = await fetch(url) - } catch (err: any) { - ctx.error('network error', { error: err }) - throw new Error(`Network error ${err}`) - } - - if (!response.ok) { - if (response.status === 404) { - throw new Error('Not Found') - } - throw new Error('HTTP error ' + response.status) - } + const response = await fetchSafe(ctx, url) if (response.body == null) { ctx.error('bad datalake response', { objectName }) @@ -99,20 +84,7 @@ export class Client { Range: `bytes=${offset}-${length ?? ''}` } - let response - try { - response = await fetch(url, { headers }) - } catch (err: any) { - ctx.error('network error', { error: err }) - throw new Error(`Network error ${err}`) - } - - if (!response.ok) { - if (response.status === 404) { - throw new Error('Not Found') - } - throw new Error('HTTP error ' + response.status) - } + const response = await fetchSafe(ctx, url, { headers }) if (response.body == null) { ctx.error('bad datalake response', { objectName }) @@ -129,20 +101,7 @@ export class Client { ): Promise { const url = this.getObjectUrl(ctx, workspace, objectName) - let response - try { - response = await fetch(url, { method: 'HEAD' }) - } catch (err: any) { - ctx.error('network error', { error: err }) - throw new Error(`Network error ${err}`) - } - - if (!response.ok) { - if (response.status === 404) { - return undefined - } - throw new Error('HTTP error ' + response.status) - } + const response = await fetchSafe(ctx, url, { method: 'HEAD' }) const headers = response.headers const lastModified = Date.parse(headers.get('Last-Modified') ?? '') @@ -158,30 +117,35 @@ export class Client { async deleteObject (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise { const url = this.getObjectUrl(ctx, workspace, objectName) - - let response - try { - response = await fetch(url, { method: 'DELETE' }) - } catch (err: any) { - ctx.error('network error', { error: err }) - throw new Error(`Network error ${err}`) - } - - if (!response.ok) { - if (response.status === 404) { - throw new Error('Not Found') - } - throw new Error('HTTP error ' + response.status) - } + await fetchSafe(ctx, url, { method: 'DELETE' }) } async putObject ( + ctx: MeasureContext, + workspace: WorkspaceId, + objectName: string, + stream: Readable | Buffer | string, + metadata: ObjectMetadata, + size?: number + ): Promise { + if (size === undefined || size < 64 * 1024 * 1024) { + await ctx.with('direct-upload', {}, async (ctx) => { + await this.uploadWithFormData(ctx, workspace, objectName, stream, metadata) + }) + } else { + await ctx.with('signed-url-upload', {}, async (ctx) => { + await this.uploadWithSignedURL(ctx, workspace, objectName, stream, metadata) + }) + } + } + + private async uploadWithFormData ( ctx: MeasureContext, workspace: WorkspaceId, objectName: string, stream: Readable | Buffer | string, metadata: ObjectMetadata - ): Promise { + ): Promise { const path = `/upload/form-data/${workspace.name}` const url = concatLink(this.endpoint, path) @@ -196,17 +160,7 @@ export class Client { } form.append('file', stream, options) - let response - try { - response = await fetch(url, { method: 'POST', body: form }) - } catch (err: any) { - ctx.error('network error', { error: err }) - throw new Error(`Network error ${err}`) - } - - if (!response.ok) { - throw new Error('HTTP error ' + response.status) - } + const response = await fetchSafe(ctx, url, { method: 'POST', body: form }) const result = (await response.json()) as BlobUploadResult[] if (result.length !== 1) { @@ -219,8 +173,68 @@ export class Client { if ('error' in uploadResult) { ctx.error('error during blob upload', { objectName, error: uploadResult.error }) throw new Error('Upload failed: ' + uploadResult.error) - } else { - return { id: uploadResult.id } } } + + private async uploadWithSignedURL ( + ctx: MeasureContext, + workspace: WorkspaceId, + objectName: string, + stream: Readable | Buffer | string, + metadata: ObjectMetadata + ): Promise { + const url = await this.signObjectSign(ctx, workspace, objectName) + + try { + await fetchSafe(ctx, url, { + body: stream, + method: 'PUT', + headers: { + 'Content-Type': metadata.type, + 'Content-Length': metadata.size?.toString() ?? '0', + 'x-amz-meta-last-modified': metadata.lastModified.toString() + } + }) + await this.signObjectComplete(ctx, workspace, objectName) + } catch { + await this.signObjectDelete(ctx, workspace, objectName) + } + } + + private async signObjectSign (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise { + const url = this.getSignObjectUrl(workspace, objectName) + const response = await fetchSafe(ctx, url, { method: 'POST' }) + return await response.text() + } + + private async signObjectComplete (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise { + const url = this.getSignObjectUrl(workspace, objectName) + await fetchSafe(ctx, url, { method: 'PUT' }) + } + + private async signObjectDelete (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise { + const url = this.getSignObjectUrl(workspace, objectName) + await fetchSafe(ctx, url, { method: 'DELETE' }) + } + + private getSignObjectUrl (workspace: WorkspaceId, objectName: string): string { + const path = `/upload/signed-url/${workspace.name}/${encodeURIComponent(objectName)}` + return concatLink(this.endpoint, path) + } +} + +async function fetchSafe (ctx: MeasureContext, url: string, init?: RequestInit): Promise { + let response + try { + response = await fetch(url, init) + } catch (err: any) { + ctx.error('network error', { error: err }) + throw new Error(`Network error ${err}`) + } + + if (!response.ok) { + throw new Error(response.status === 404 ? 'Not Found' : 'HTTP error ' + response.status) + } + + return response } diff --git a/server/datalake/src/index.ts b/server/datalake/src/index.ts index 071a063c50..3e58e548e5 100644 --- a/server/datalake/src/index.ts +++ b/server/datalake/src/index.ts @@ -37,7 +37,7 @@ export class DatalakeService implements StorageAdapter { static config = 'datalake' client: Client constructor (readonly opt: DatalakeConfig) { - this.client = new Client(opt.endpoint) + this.client = new Client(opt.endpoint, opt.port) } async initialize (ctx: MeasureContext, workspaceId: WorkspaceId): Promise {} @@ -129,7 +129,7 @@ export class DatalakeService implements StorageAdapter { await ctx.with('put', {}, async (ctx) => { await withRetry(ctx, 5, async () => { - return await this.client.putObject(ctx, workspaceId, objectName, stream, metadata) + await this.client.putObject(ctx, workspaceId, objectName, stream, metadata, size) }) }) diff --git a/server/front/readme.md b/server/front/readme.md index 20b4615959..0baf88ac2c 100644 --- a/server/front/readme.md +++ b/server/front/readme.md @@ -17,6 +17,7 @@ Front service is suited to deliver application bundles and resource assets, it a * MODEL_VERSION: Specifies the required model version. * SERVER_SECRET: Specifies the server secret. * PREVIEW_CONFIG: Specifies the preview configuration. +* UPLOAD_CONFIG: Specifies the upload configuration. * BRANDING_URL: Specifies the URL of the branding service. ## Preview service configuration diff --git a/server/front/src/index.ts b/server/front/src/index.ts index a174f93b00..0a87fae70c 100644 --- a/server/front/src/index.ts +++ b/server/front/src/index.ts @@ -256,6 +256,7 @@ export function start ( collaboratorUrl: string brandingUrl?: string previewConfig: string + uploadConfig: string pushPublicKey?: string disableSignUp?: string }, @@ -308,6 +309,7 @@ export function start ( COLLABORATOR_URL: config.collaboratorUrl, BRANDING_URL: config.brandingUrl, PREVIEW_CONFIG: config.previewConfig, + UPLOAD_CONFIG: config.uploadConfig, PUSH_PUBLIC_KEY: config.pushPublicKey, DISABLE_SIGNUP: config.disableSignUp, ...(extraConfig ?? {}) @@ -501,8 +503,15 @@ export function start ( void filesHandler(req, res) }) - // eslint-disable-next-line @typescript-eslint/no-misused-promises - app.post('/files', async (req, res) => { + app.post('/files', (req, res) => { + void handleUpload(req, res) + }) + + app.post('/files/*', (req, res) => { + void handleUpload(req, res) + }) + + const handleUpload = async (req: Request, res: Response): Promise => { await ctx.with( 'post-file', {}, @@ -538,7 +547,7 @@ export function start ( }, { url: req.path, query: req.query } ) - }) + } const handleDelete = async (req: Request, res: Response): Promise => { try { diff --git a/server/front/src/starter.ts b/server/front/src/starter.ts index e1301978f7..8823fb27f1 100644 --- a/server/front/src/starter.ts +++ b/server/front/src/starter.ts @@ -101,6 +101,11 @@ export function startFront (ctx: MeasureContext, extraConfig?: Record { + const sql = postgres(env.HYPERDRIVE.connectionString) + const { bucket } = selectStorage(env, workspace) + + const blob = await db.getBlob(sql, { workspace, name }) + if (blob === null || blob.deleted) { + return error(404) + } + + const cache = caches.default + const cached = await cache.match(request) + if (cached !== undefined) { + return cached + } + + const range = request.headers.has('Range') ? request.headers : undefined + const object = await bucket.get(blob.filename, { range }) + if (object === null) { + return error(404) + } + + const headers = r2MetadataHeaders(object) + if (range !== undefined && object?.range !== undefined) { + headers.set('Content-Range', rangeHeader(object.range, object.size)) + } + + const length = object?.range !== undefined && 'length' in object.range ? object?.range?.length : undefined + const status = length !== undefined && length < object.size ? 206 : 200 + + const response = new Response(object?.body, { headers, status }) + ctx.waitUntil(cache.put(request, response.clone())) + + return response +} + +export async function handleBlobHead ( + request: Request, + env: Env, + ctx: ExecutionContext, + workspace: string, + name: string +): Promise { + const sql = postgres(env.HYPERDRIVE.connectionString) + const { bucket } = selectStorage(env, workspace) + + const blob = await db.getBlob(sql, { workspace, name }) + if (blob === null) { + return error(404) + } + + const head = await bucket.head(blob.filename) + if (head?.httpMetadata === undefined) { + return error(404) + } + + const headers = r2MetadataHeaders(head) + return new Response(null, { headers, status: 200 }) +} + +export async function deleteBlob (env: Env, workspace: string, name: string): Promise { + const sql = postgres(env.HYPERDRIVE.connectionString) + + try { + await Promise.all([db.deleteBlob(sql, { workspace, name }), deleteVideo(env, workspace, name)]) + + return new Response(null, { status: 204 }) + } catch (err: any) { + const message = err instanceof Error ? err.message : String(err) + console.error({ error: 'failed to delete blob:' + message }) + return error(500) + } +} + +export async function postBlobFormData (request: Request, env: Env, workspace: string): Promise { + const sql = postgres(env.HYPERDRIVE.connectionString) + const formData = await request.formData() + + const files: [File, key: string][] = [] + formData.forEach((value: any, key: string) => { + if (typeof value === 'object') files.push([value, key]) + }) + + const result = await Promise.all( + files.map(async ([file, key]) => { + const { name, type, lastModified } = file + try { + const metadata = await saveBlob(env, sql, file, type, workspace, name, lastModified) + + // TODO this probably should happen via queue, let it be here for now + if (type.startsWith('video/')) { + const blobURL = getBlobURL(request, workspace, name) + await copyVideo(env, blobURL, workspace, name) + } + + return { key, metadata } + } catch (err: any) { + const error = err instanceof Error ? err.message : String(err) + console.error('failed to upload blob:', error) + return { key, error } + } + }) + ) + + return json(result) +} + +async function saveBlob ( + env: Env, + sql: postgres.Sql, + file: File, + type: string, + workspace: string, + name: string, + lastModified: number +): Promise { + const { location, bucket } = selectStorage(env, workspace) + + const size = file.size + const [mimetype, subtype] = type.split('/') + const httpMetadata = { contentType: type, cacheControl } + const filename = getUniqueFilename() + + const sha256hash = await getSha256(file) + + if (sha256hash !== null) { + // Lucky boy, nothing to upload, use existing blob + const hash = sha256hash + + const data = await db.getData(sql, { hash, location }) + if (data !== null) { + await db.createBlob(sql, { workspace, name, hash, location }) + } else { + await bucket.put(filename, file, { httpMetadata }) + await sql.begin((sql) => [ + db.createData(sql, { hash, location, filename, type: mimetype, subtype, size }), + db.createBlob(sql, { workspace, name, hash, location }) + ]) + } + + return { type, size, lastModified, name } + } else { + // For large files we cannot calculate checksum beforehead + // upload file with unique filename and then obtain checksum + const object = await bucket.put(filename, file, { httpMetadata }) + + const hash = + object.checksums.md5 !== undefined ? getMd5Checksum(object.checksums.md5) : (crypto.randomUUID() as UUID) + + const data = await db.getData(sql, { hash, location }) + if (data !== null) { + // We found an existing blob with the same hash + // we can safely remove the existing blob from storage + await Promise.all([bucket.delete(filename), db.createBlob(sql, { workspace, name, hash, location })]) + } else { + // Otherwise register a new hash and blob + await sql.begin((sql) => [ + db.createData(sql, { hash, location, filename, type: mimetype, subtype, size }), + db.createBlob(sql, { workspace, name, hash, location }) + ]) + } + + return { type, size, lastModified, name } + } +} + +export async function handleBlobUploaded (env: Env, workspace: string, name: string, filename: UUID): Promise { + const sql = postgres(env.HYPERDRIVE.connectionString) + const { location, bucket } = selectStorage(env, workspace) + + const object = await bucket.head(filename) + if (object?.httpMetadata === undefined) { + throw Error('blob not found') + } + + const hash = object.checksums.md5 !== undefined ? getMd5Checksum(object.checksums.md5) : (crypto.randomUUID() as UUID) + + const data = await db.getData(sql, { hash, location }) + if (data !== null) { + await Promise.all([bucket.delete(filename), db.createBlob(sql, { workspace, name, hash, location })]) + } else { + const size = object.size + const type = object.httpMetadata.contentType ?? 'application/octet-stream' + const [mimetype, subtype] = type.split('/') + + await db.createData(sql, { hash, location, filename, type: mimetype, subtype, size }) + await db.createBlob(sql, { workspace, name, hash, location }) + } +} + +function getUniqueFilename (): UUID { + return crypto.randomUUID() as UUID +} + +async function getSha256 (file: File): Promise { + if (file.size > HASH_LIMIT) { + return null + } + + const digestStream = new crypto.DigestStream('SHA-256') + await file.stream().pipeTo(digestStream) + const digest = await digestStream.digest + + return toUUID(new Uint8Array(digest)) +} + +function getMd5Checksum (digest: ArrayBuffer): UUID { + return toUUID(new Uint8Array(digest)) +} + +function rangeHeader (range: R2Range, size: number): string { + const offset = 'offset' in range ? range.offset : undefined + const length = 'length' in range ? range.length : undefined + const suffix = 'suffix' in range ? range.suffix : undefined + + const start = suffix !== undefined ? size - suffix : offset ?? 0 + const end = suffix !== undefined ? size : length !== undefined ? start + length : size + + return `bytes ${start}-${end - 1}/${size}` +} + +function r2MetadataHeaders (head: R2Object): Headers { + return head.httpMetadata !== undefined + ? new Headers({ + 'Accept-Ranges': 'bytes', + 'Content-Length': head.size.toString(), + 'Content-Type': head.httpMetadata.contentType ?? '', + 'Cache-Control': head.httpMetadata.cacheControl ?? cacheControl, + 'Last-Modified': head.uploaded.toUTCString(), + ETag: head.httpEtag + }) + : new Headers({ + 'Accept-Ranges': 'bytes', + 'Content-Length': head.size.toString(), + 'Cache-Control': cacheControl, + 'Last-Modified': head.uploaded.toUTCString(), + ETag: head.httpEtag + }) +} diff --git a/workers/datalake/src/cors.ts b/workers/datalake/src/cors.ts new file mode 100644 index 0000000000..978a2d31dd --- /dev/null +++ b/workers/datalake/src/cors.ts @@ -0,0 +1,103 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { type IRequest } from 'itty-router' + +// This is a copy of cors.ts from itty-router with following issues fixed: +// - https://github.com/kwhitley/itty-router/issues/242 +// - https://github.com/kwhitley/itty-router/issues/249 +export interface CorsOptions { + credentials?: true + origin?: boolean | string | string[] | RegExp | ((origin: string) => string | undefined) + maxAge?: number + allowMethods?: string | string[] + allowHeaders?: any + exposeHeaders?: string | string[] +} + +export type Preflight = (request: IRequest) => Response | undefined +export type Corsify = (response: Response, request?: IRequest) => Response | undefined + +export interface CorsPair { + preflight: Preflight + corsify: Corsify +} + +// Create CORS function with default options. +export const cors = (options: CorsOptions = {}): CorsPair => { + // Destructure and set defaults for options. + const { origin = '*', credentials = false, allowMethods = '*', allowHeaders, exposeHeaders, maxAge } = options + + const getAccessControlOrigin = (request?: Request): string | null | undefined => { + const requestOrigin = request?.headers.get('origin') // may be null if no request passed + if (requestOrigin === undefined || requestOrigin === null) return requestOrigin + + if (origin === true) return requestOrigin + if (origin instanceof RegExp) return origin.test(requestOrigin) ? requestOrigin : undefined + if (Array.isArray(origin)) return origin.includes(requestOrigin) ? requestOrigin : undefined + if (origin instanceof Function) return origin(requestOrigin) ?? undefined + + return origin === '*' && credentials ? requestOrigin : (origin as string) + } + + const appendHeadersAndReturn = (response: Response, headers: Record): Response => { + for (const [key, value] of Object.entries(headers)) { + if (value !== undefined && value !== null && value !== '') { + response.headers.append(key, value) + } + } + return response + } + + const preflight = (request: Request): Response | undefined => { + if (request.method === 'OPTIONS') { + const response = new Response(null, { status: 204 }) + + const allowMethodsHeader = Array.isArray(allowMethods) ? allowMethods.join(',') : allowMethods + const allowHeadersHeader = Array.isArray(allowHeaders) ? allowHeaders.join(',') : allowHeaders + const exposeHeadersHeader = Array.isArray(exposeHeaders) ? exposeHeaders.join(',') : exposeHeaders + + return appendHeadersAndReturn(response, { + 'access-control-allow-origin': getAccessControlOrigin(request), + 'access-control-allow-methods': allowMethodsHeader, + 'access-control-expose-headers': exposeHeadersHeader, + 'access-control-allow-headers': allowHeadersHeader ?? request.headers.get('access-control-request-headers'), + 'access-control-max-age': maxAge, + 'access-control-allow-credentials': credentials + }) + } // otherwise ignore + } + + const corsify = (response: Response, request?: Request): Response | undefined => { + // ignore if already has CORS headers + if (response?.headers?.has('access-control-allow-origin') || response.status === 101) { + return response + } + + const responseCopy = new Response(response.body, { + status: response.status, + statusText: response.statusText, + headers: response.headers + }) + + return appendHeadersAndReturn(responseCopy, { + 'access-control-allow-origin': getAccessControlOrigin(request), + 'access-control-allow-credentials': credentials + }) + } + + // Return corsify and preflight methods. + return { corsify, preflight } +} diff --git a/workers/datalake/src/db.ts b/workers/datalake/src/db.ts new file mode 100644 index 0000000000..ad7ea03f33 --- /dev/null +++ b/workers/datalake/src/db.ts @@ -0,0 +1,105 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import type postgres from 'postgres' +import { type Location, type UUID } from './types' + +export interface BlobDataId { + hash: UUID + location: Location +} + +export interface BlobDataRecord extends BlobDataId { + filename: UUID + size: number + type: string + subtype: string +} + +export interface BlobId { + workspace: string + name: string +} + +export interface BlobRecord extends BlobId { + hash: UUID + location: Location + deleted: boolean +} + +export interface BlobRecordWithFilename extends BlobRecord { + filename: string +} + +export async function getData (sql: postgres.Sql, dataId: BlobDataId): Promise { + const { hash, location } = dataId + + const rows = await sql` + SELECT hash, location, filename, size, type, subtype + FROM blob.data + WHERE hash = ${hash} AND location = ${location} + ` + + if (rows.length > 0) { + return rows[0] + } + + return null +} + +export async function createData (sql: postgres.Sql, data: BlobDataRecord): Promise { + const { hash, location, filename, size, type, subtype } = data + + await sql` + UPSERT INTO blob.data (hash, location, filename, size, type, subtype) + VALUES (${hash}, ${location}, ${filename}, ${size}, ${type}, ${subtype}) + ` +} + +export async function getBlob (sql: postgres.Sql, blobId: BlobId): Promise { + const { workspace, name } = blobId + + const rows = await sql` + SELECT b.workspace, b.name, b.hash, b.location, b.deleted, d.filename + FROM blob.blob AS b + JOIN blob.data AS d ON b.hash = d.hash AND b.location = d.location + WHERE b.workspace = ${workspace} AND b.name = ${name} + ` + + if (rows.length > 0) { + return rows[0] + } + + return null +} + +export async function createBlob (sql: postgres.Sql, blob: Omit): Promise { + const { workspace, name, hash, location } = blob + + await sql` + UPSERT INTO blob.blob (workspace, name, hash, location, deleted) + VALUES (${workspace}, ${name}, ${hash}, ${location}, false) + ` +} + +export async function deleteBlob (sql: postgres.Sql, blob: BlobId): Promise { + const { workspace, name } = blob + + await sql` + UPDATE blob.blob + SET deleted = true + WHERE workspace = ${workspace} AND name = ${name} + ` +} diff --git a/workers/datalake/src/encodings.ts b/workers/datalake/src/encodings.ts new file mode 100644 index 0000000000..7cf3b9347d --- /dev/null +++ b/workers/datalake/src/encodings.ts @@ -0,0 +1,37 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { type UUID } from './types' + +export const toUUID = (buffer: Uint8Array): UUID => { + const hex = toHex(buffer) + const hex32 = hex.slice(0, 32).padStart(32, '0') + return formatHexAsUUID(hex32) +} + +export const toHex = (buffer: Uint8Array): string => { + return Array.from(buffer) + .map((b) => b.toString(16).padStart(2, '0')) + .join('') +} + +export const etag = (id: string): string => `"${id}"` + +export function formatHexAsUUID (hexString: string): UUID { + if (hexString.length !== 32) { + throw new Error('Hex string must be exactly 32 characters long.') + } + return hexString.replace(/^(.{8})(.{4})(.{4})(.{4})(.{12})$/, '$1-$2-$3-$4-$5') as UUID +} diff --git a/workers/datalake/src/image.ts b/workers/datalake/src/image.ts new file mode 100644 index 0000000000..c309d114b2 --- /dev/null +++ b/workers/datalake/src/image.ts @@ -0,0 +1,50 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { getBlobURL } from './blob' + +const prefferedImageFormats = ['webp', 'avif', 'jpeg', 'png'] + +export async function getImage ( + request: Request, + workspace: string, + name: string, + transform: string +): Promise { + const Accept = request.headers.get('Accept') ?? 'image/*' + const image: Record = {} + + // select format based on Accept header + const formats = Accept.split(',') + for (const format of formats) { + const [type] = format.split(';') + const [clazz, kind] = type.split('/') + if (clazz === 'image' && prefferedImageFormats.includes(kind)) { + image.format = kind + break + } + } + + // apply transforms + transform.split(',').reduce((acc, param) => { + const [key, value] = param.split('=') + acc[key] = value + return acc + }, image) + + const blobURL = getBlobURL(request, workspace, name) + const imageRequest = new Request(blobURL, { headers: { Accept } }) + return await fetch(imageRequest, { cf: { image, cacheTtl: 3600 } }) +} diff --git a/workers/datalake/src/index.ts b/workers/datalake/src/index.ts new file mode 100644 index 0000000000..ffd0fed9e1 --- /dev/null +++ b/workers/datalake/src/index.ts @@ -0,0 +1,73 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { type IRequest, Router, error, html } from 'itty-router' +import { + deleteBlob as handleBlobDelete, + handleBlobGet, + handleBlobHead, + postBlobFormData as handleUploadFormData +} from './blob' +import { cors } from './cors' +import { getImage as handleImageGet } from './image' +import { getVideoMeta as handleVideoMetaGet } from './video' +import { handleSignAbort, handleSignComplete, handleSignCreate } from './sign' + +const { preflight, corsify } = cors({ + maxAge: 86400 +}) + +export default { + async fetch (request, env, ctx): Promise { + const router = Router({ + before: [preflight], + finally: [corsify] + }) + + router + .get('/blob/:workspace/:name', ({ params }) => handleBlobGet(request, env, ctx, params.workspace, params.name)) + .head('/blob/:workspace/:name', ({ params }) => handleBlobHead(request, env, ctx, params.workspace, params.name)) + .delete('/blob/:workspace/:name', ({ params }) => handleBlobDelete(env, params.workspace, params.name)) + // Image + .get('/image/:transform/:workspace/:name', ({ params }) => + handleImageGet(request, params.workspace, params.name, params.transform) + ) + // Video + .get('/video/:workspace/:name/meta', ({ params }) => + handleVideoMetaGet(request, env, ctx, params.workspace, params.name) + ) + // Form Data + .post('/upload/form-data/:workspace', ({ params }) => handleUploadFormData(request, env, params.workspace)) + // Signed URL + .post('/upload/signed-url/:workspace/:name', ({ params }) => + handleSignCreate(request, env, ctx, params.workspace, params.name) + ) + .put('/upload/signed-url/:workspace/:name', ({ params }) => + handleSignComplete(request, env, ctx, params.workspace, params.name) + ) + .delete('/upload/signed-url/:workspace/:name', ({ params }) => + handleSignAbort(request, env, ctx, params.workspace, params.name) + ) + .all('/', () => + html( + `Huly® Datalake™ https://huly.io + © 2024 Huly Labs` + ) + ) + .all('*', () => error(404)) + + return await router.fetch(request).catch(error) + } +} satisfies ExportedHandler diff --git a/workers/datalake/src/sign.ts b/workers/datalake/src/sign.ts new file mode 100644 index 0000000000..e1b2ed1aad --- /dev/null +++ b/workers/datalake/src/sign.ts @@ -0,0 +1,136 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { AwsClient } from 'aws4fetch' +import { error } from 'itty-router' + +import { handleBlobUploaded } from './blob' +import { type UUID } from './types' +import { selectStorage, type Storage } from './storage' + +const S3_SIGNED_LINK_TTL = 3600 + +interface SignBlobInfo { + uuid: UUID +} + +function signBlobKey (workspace: string, name: string): string { + return `s/${workspace}/${name}` +} + +function getS3Client (storage: Storage): AwsClient { + return new AwsClient({ + service: 's3', + region: 'auto', + accessKeyId: storage.bucketAccessKey, + secretAccessKey: storage.bucketSecretKey + }) +} + +export async function handleSignCreate ( + request: Request, + env: Env, + ctx: ExecutionContext, + workspace: string, + name: string +): Promise { + const storage = selectStorage(env, workspace) + const accountId = env.R2_ACCOUNT_ID + + const key = signBlobKey(workspace, name) + const uuid = crypto.randomUUID() as UUID + + // Generate R2 object link + const url = new URL(`https://${storage.bucketName}.${accountId}.r2.cloudflarestorage.com`) + url.pathname = uuid + url.searchParams.set('X-Amz-Expires', S3_SIGNED_LINK_TTL.toString()) + + // Sign R2 object link + let signed: Request + try { + const client = getS3Client(storage) + + signed = await client.sign(new Request(url, { method: 'PUT' }), { aws: { signQuery: true } }) + } catch (err: any) { + console.error({ error: 'failed to generate signed url', message: `${err}` }) + return error(500, 'failed to generate signed url') + } + + // Save upload details + const s3BlobInfo: SignBlobInfo = { uuid } + await env.datalake_blobs.put(key, JSON.stringify(s3BlobInfo), { expirationTtl: S3_SIGNED_LINK_TTL }) + + const headers = new Headers({ + Expires: new Date(Date.now() + S3_SIGNED_LINK_TTL * 1000).toISOString() + }) + return new Response(signed.url, { status: 200, headers }) +} + +export async function handleSignComplete ( + request: Request, + env: Env, + ctx: ExecutionContext, + workspace: string, + name: string +): Promise { + const { bucket } = selectStorage(env, workspace) + const key = signBlobKey(workspace, name) + + // Ensure we generated presigned URL earlier + // TODO what if we came after expiration date? + const signBlobInfo = await env.datalake_blobs.get(key, { type: 'json' }) + if (signBlobInfo === null) { + console.error({ error: 'blob sign info not found', workspace, name }) + return error(404) + } + + // Ensure the blob has been uploaded + const { uuid } = signBlobInfo + const head = await bucket.get(uuid) + if (head === null) { + console.error({ error: 'blob not found', workspace, name, uuid }) + return error(400) + } + + try { + await handleBlobUploaded(env, workspace, name, uuid) + } catch (err) { + const message = err instanceof Error ? err.message : String(err) + console.error({ error: message, workspace, name, uuid }) + return error(500, 'failed to upload blob') + } + + await env.datalake_blobs.delete(key) + + return new Response(null, { status: 201 }) +} + +export async function handleSignAbort ( + request: Request, + env: Env, + ctx: ExecutionContext, + workspace: string, + name: string +): Promise { + const key = signBlobKey(workspace, name) + + // Check if the blob has been uploaded + const s3BlobInfo = await env.datalake_blobs.get(key, { type: 'json' }) + if (s3BlobInfo !== null) { + await env.datalake_blobs.delete(key) + } + + return new Response(null, { status: 204 }) +} diff --git a/workers/datalake/src/storage.ts b/workers/datalake/src/storage.ts new file mode 100644 index 0000000000..777ab03317 --- /dev/null +++ b/workers/datalake/src/storage.ts @@ -0,0 +1,76 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the 'License'); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an 'AS IS' BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { type Location } from './types' + +export interface Storage { + location: Location + bucket: R2Bucket + + bucketName: string + bucketAccessKey: string + bucketSecretKey: string +} + +export function selectStorage (env: Env, workspace: string): Storage { + const location = selectLocation(env, workspace) + switch (location) { + case 'apac': + return { + location, + bucket: env.DATALAKE_APAC, + bucketName: env.DATALAKE_APAC_BUCKET_NAME, + bucketAccessKey: env.DATALAKE_APAC_ACCESS_KEY, + bucketSecretKey: env.DATALAKE_APAC_SECRET_KEY + } + case 'eeur': + return { + location, + bucket: env.DATALAKE_EEUR, + bucketName: env.DATALAKE_EEUR_BUCKET_NAME, + bucketAccessKey: env.DATALAKE_EEUR_ACCESS_KEY, + bucketSecretKey: env.DATALAKE_EEUR_SECRET_KEY + } + case 'weur': + return { + location, + bucket: env.DATALAKE_WEUR, + bucketName: env.DATALAKE_WEUR_BUCKET_NAME, + bucketAccessKey: env.DATALAKE_WEUR_ACCESS_KEY, + bucketSecretKey: env.DATALAKE_WEUR_SECRET_KEY + } + case 'enam': + return { + location, + bucket: env.DATALAKE_ENAM, + bucketName: env.DATALAKE_ENAM_BUCKET_NAME, + bucketAccessKey: env.DATALAKE_ENAM_ACCESS_KEY, + bucketSecretKey: env.DATALAKE_ENAM_SECRET_KEY + } + case 'wnam': + return { + location, + bucket: env.DATALAKE_WNAM, + bucketName: env.DATALAKE_WNAM_BUCKET_NAME, + bucketAccessKey: env.DATALAKE_WNAM_ACCESS_KEY, + bucketSecretKey: env.DATALAKE_WNAM_SECRET_KEY + } + } +} + +function selectLocation (env: Env, workspace: string): Location { + // TODO select location based on workspace + return 'weur' +} diff --git a/workers/datalake/src/types.ts b/workers/datalake/src/types.ts new file mode 100644 index 0000000000..5e5baad474 --- /dev/null +++ b/workers/datalake/src/types.ts @@ -0,0 +1,32 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +export type Location = 'weur' | 'eeur' | 'wnam' | 'enam' | 'apac' + +export type UUID = string & { __uuid: true } + +export interface CloudflareResponse { + success: boolean + errors: any + messages: any + result: any +} + +export interface StreamUploadResponse extends CloudflareResponse { + result: { + uid: string + uploadURL: string + } +} diff --git a/workers/datalake/src/video.ts b/workers/datalake/src/video.ts new file mode 100644 index 0000000000..229490c009 --- /dev/null +++ b/workers/datalake/src/video.ts @@ -0,0 +1,121 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { error, json } from 'itty-router' + +import { type CloudflareResponse, type StreamUploadResponse } from './types' + +export type StreamUploadState = 'ready' | 'error' | 'inprogress' | 'queued' | 'downloading' | 'pendingupload' + +// https://developers.cloudflare.com/api/operations/stream-videos-list-videos#response-body +export interface StreamDetailsResponse extends CloudflareResponse { + result: { + uid: string + thumbnail: string + status: { + state: StreamUploadState + } + playback: { + hls: string + dash: string + } + } +} + +interface StreamBlobInfo { + streamId: string +} + +function streamBlobKey (workspace: string, name: string): string { + return `v/${workspace}/${name}` +} + +export async function getVideoMeta ( + request: Request, + env: Env, + ctx: ExecutionContext, + workspace: string, + name: string +): Promise { + const key = streamBlobKey(workspace, name) + + const streamInfo = await env.datalake_blobs.get(key, { type: 'json' }) + if (streamInfo === null) { + return error(404) + } + + const url = `https://api.cloudflare.com/client/v4/accounts/${env.STREAMS_ACCOUNT_ID}/stream/${streamInfo.streamId}` + const streamRequest = new Request(url, { + headers: { + Authorization: `Bearer ${env.STREAMS_AUTH_KEY}`, + 'Content-Type': 'application/json' + } + }) + + const streamResponse = await fetch(streamRequest) + const stream = await streamResponse.json() + + if (stream.success) { + return json({ + status: stream.result.status.state, + thumbnail: stream.result.thumbnail, + hls: stream.result.playback.hls + }) + } else { + return error(500, { errors: stream.errors }) + } +} + +export async function copyVideo (env: Env, source: string, workspace: string, name: string): Promise { + const key = streamBlobKey(workspace, name) + + const url = `https://api.cloudflare.com/client/v4/accounts/${env.STREAMS_ACCOUNT_ID}/stream/copy` + const request = new Request(url, { + method: 'POST', + headers: { + Authorization: `Bearer ${env.STREAMS_AUTH_KEY}`, + 'Content-Type': 'application/json' + }, + body: JSON.stringify({ url: source, meta: { name } }) + }) + + const response = await fetch(request) + const upload = await response.json() + + if (upload.success) { + const streamInfo: StreamBlobInfo = { + streamId: upload.result.uid + } + await env.datalake_blobs.put(key, JSON.stringify(streamInfo)) + } +} + +export async function deleteVideo (env: Env, workspace: string, name: string): Promise { + const key = streamBlobKey(workspace, name) + + const streamInfo = await env.datalake_blobs.get(key, { type: 'json' }) + if (streamInfo !== null) { + const url = `https://api.cloudflare.com/client/v4/accounts/${env.STREAMS_ACCOUNT_ID}/stream/${streamInfo.streamId}` + const request = new Request(url, { + method: 'DELETE', + headers: { + Authorization: `Bearer ${env.STREAMS_AUTH_KEY}`, + 'Content-Type': 'application/json' + } + }) + + await Promise.all([fetch(request), env.datalake_blobs.delete(key)]) + } +} diff --git a/workers/datalake/tsconfig.json b/workers/datalake/tsconfig.json new file mode 100644 index 0000000000..da8672e6cb --- /dev/null +++ b/workers/datalake/tsconfig.json @@ -0,0 +1,12 @@ +{ + "extends": "./node_modules/@hcengineering/platform-rig/profiles/default/tsconfig.json", + + "compilerOptions": { + "rootDir": "./src", + "outDir": "./lib", + "declarationDir": "./types", + "tsBuildInfoFile": ".build/build.tsbuildinfo", + "types": ["@cloudflare/workers-types", "jest"], + "lib": ["esnext"] + } +} \ No newline at end of file diff --git a/workers/datalake/worker-configuration.d.ts b/workers/datalake/worker-configuration.d.ts new file mode 100644 index 0000000000..dfc1e51e8e --- /dev/null +++ b/workers/datalake/worker-configuration.d.ts @@ -0,0 +1,30 @@ +// Generated by Wrangler on Sat Jul 06 2024 18:52:21 GMT+0200 (Central European Summer Time) +// by running `wrangler types` + +interface Env { + datalake_blobs: KVNamespace; + DATALAKE_APAC: R2Bucket; + DATALAKE_EEUR: R2Bucket; + DATALAKE_WEUR: R2Bucket; + DATALAKE_ENAM: R2Bucket; + DATALAKE_WNAM: R2Bucket; + HYPERDRIVE: Hyperdrive; + STREAMS_ACCOUNT_ID: string; + STREAMS_AUTH_KEY: string; + R2_ACCOUNT_ID: string; + DATALAKE_APAC_ACCESS_KEY: string; + DATALAKE_APAC_SECRET_KEY: string; + DATALAKE_APAC_BUCKET_NAME: string; + DATALAKE_EEUR_ACCESS_KEY: string; + DATALAKE_EEUR_SECRET_KEY: string; + DATALAKE_EEUR_BUCKET_NAME: string; + DATALAKE_WEUR_ACCESS_KEY: string; + DATALAKE_WEUR_SECRET_KEY: string; + DATALAKE_WEUR_BUCKET_NAME: string; + DATALAKE_ENAM_ACCESS_KEY: string; + DATALAKE_ENAM_SECRET_KEY: string; + DATALAKE_ENAM_BUCKET_NAME: string; + DATALAKE_WNAM_ACCESS_KEY: string; + DATALAKE_WNAM_SECRET_KEY: string; + DATALAKE_WNAM_BUCKET_NAME: string; +} diff --git a/workers/datalake/wrangler.toml b/workers/datalake/wrangler.toml new file mode 100644 index 0000000000..12f3afc6a3 --- /dev/null +++ b/workers/datalake/wrangler.toml @@ -0,0 +1,48 @@ +#:schema node_modules/wrangler/config-schema.json +name = "datalake-worker" +main = "src/index.ts" +compatibility_date = "2024-07-01" +compatibility_flags = ["nodejs_compat"] +keep_vars = true + +kv_namespaces = [ + { binding = "datalake_blobs", id = "64144eb146fd45febc928d44419ebb39", preview_id = "31c6f6e76e7e4524a59f87a4f381de82" } +] + +r2_buckets = [ + { binding = "DATALAKE_APAC", bucket_name = "datalake-apac", preview_bucket_name = "dev-datalake-eu-west" }, + { binding = "DATALAKE_EEUR", bucket_name = "datalake-eeur", preview_bucket_name = "dev-datalake-eu-west" }, + { binding = "DATALAKE_WEUR", bucket_name = "datalake-weur", preview_bucket_name = "dev-datalake-eu-west" }, + { binding = "DATALAKE_ENAM", bucket_name = "datalake-enam", preview_bucket_name = "dev-datalake-eu-west" }, + { binding = "DATALAKE_WNAM", bucket_name = "datalake-wnam", preview_bucket_name = "dev-datalake-eu-west" } +] + +[[hyperdrive]] +binding = "HYPERDRIVE" +id = "87259c3ae41e41a7b35e610d4282d85a" +localConnectionString = "postgresql://root:roach@localhost:26257/datalake" + +[observability] +enabled = true +head_sampling_rate = 1 + +[vars] +DATALAKE_EEUR_BUCKET_NAME = "datalake-eeur" +# DATALAKE_EEUR_ACCESS_KEY = "" +# DATALAKE_EEUR_SECRET_KEY = "" +DATALAKE_WEUR_BUCKET_NAME = "datalake-weur" +# DATALAKE_WEUR_ACCESS_KEY = "" +# DATALAKE_WEUR_SECRET_KEY = "" +DATALAKE_APAC_BUCKET_NAME = "datalake-apac" +# DATALAKE_APAC_ACCESS_KEY = "" +# DATALAKE_APAC_SECRET_KEY = "" +DATALAKE_ENAM_BUCKET_NAME = "datalake-enam" +# DATALAKE_ENAM_ACCESS_KEY = "" +# DATALAKE_ENAM_SECRET_KEY = "" +DATALAKE_WNAM_BUCKET_NAME = "datalake-wnam" +# DATALAKE_WNAM_ACCESS_KEY = "" +# DATALAKE_WNAM_SECRET_KEY = "" + +# STREAMS_ACCOUNT_ID = "" +# STREAMS_AUTH_KEY = "" +# R2_ACCOUNT_ID = ""