diff --git a/server/datalake/src/client.ts b/server/datalake/src/client.ts index 042d785a0e..8f64dfa372 100644 --- a/server/datalake/src/client.ts +++ b/server/datalake/src/client.ts @@ -25,9 +25,16 @@ export interface ObjectMetadata { lastModified: number name: string type: string + etag: string size?: number } +/** @public */ +export interface ListObjectOutput { + cursor: string | undefined + blobs: Omit[] +} + /** @public */ export interface StatObjectOutput { lastModified: number @@ -36,6 +43,13 @@ export interface StatObjectOutput { size?: number } +/** @public */ +export interface UploadObjectParams { + lastModified: number + type: string + size?: number +} + interface BlobUploadError { key: string error: string @@ -68,6 +82,23 @@ export class DatalakeClient { return concatLink(this.endpoint, path) } + async listObjects ( + ctx: MeasureContext, + workspace: WorkspaceId, + cursor: string | undefined + ): Promise { + const limit = 100 + const path = `/blob/${workspace.name}` + const url = new URL(concatLink(this.endpoint, path)) + url.searchParams.append('limit', String(limit)) + if (cursor !== undefined) { + url.searchParams.append('cursor', cursor) + } + + const response = await fetchSafe(ctx, url) + return (await response.json()) as ListObjectOutput + } + async getObject (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise { const url = this.getObjectUrl(ctx, workspace, objectName) @@ -166,9 +197,9 @@ export class DatalakeClient { workspace: WorkspaceId, objectName: string, stream: Readable | Buffer | string, - metadata: ObjectMetadata, - size?: number - ): Promise { + params: UploadObjectParams + ): Promise { + let size = params.size if (size === undefined) { if (Buffer.isBuffer(stream)) { size = stream.length @@ -182,12 +213,12 @@ export class DatalakeClient { try { if (size === undefined || size < 64 * 1024 * 1024) { - await ctx.with('direct-upload', {}, (ctx) => - this.uploadWithFormData(ctx, workspace, objectName, stream, metadata) + return await ctx.with('direct-upload', {}, (ctx) => + this.uploadWithFormData(ctx, workspace, objectName, stream, { ...params, size }) ) } else { - await ctx.with('signed-url-upload', {}, (ctx) => - this.uploadWithSignedURL(ctx, workspace, objectName, stream, metadata) + return await ctx.with('signed-url-upload', {}, (ctx) => + this.uploadWithSignedURL(ctx, workspace, objectName, stream, { ...params, size }) ) } } catch (err) { @@ -201,18 +232,18 @@ export class DatalakeClient { workspace: WorkspaceId, objectName: string, stream: Readable | Buffer | string, - metadata: ObjectMetadata - ): Promise { + params: UploadObjectParams + ): Promise { const path = `/upload/form-data/${workspace.name}` const url = concatLink(this.endpoint, path) const form = new FormData() const options: FormData.AppendOptions = { filename: objectName, - contentType: metadata.type, - knownLength: metadata.size, + contentType: params.type, + knownLength: params.size, header: { - 'Last-Modified': metadata.lastModified + 'Last-Modified': params.lastModified } } form.append('file', stream, options) @@ -229,6 +260,8 @@ export class DatalakeClient { if ('error' in uploadResult) { throw new DatalakeError('Upload failed: ' + uploadResult.error) } + + return uploadResult.metadata } async uploadMultipart ( @@ -236,11 +269,11 @@ export class DatalakeClient { workspace: WorkspaceId, objectName: string, stream: Readable | Buffer | string, - metadata: ObjectMetadata - ): Promise { + params: UploadObjectParams + ): Promise { const chunkSize = 10 * 1024 * 1024 - const multipart = await this.multipartUploadStart(ctx, workspace, objectName, metadata) + const multipart = await this.multipartUploadStart(ctx, workspace, objectName, params) try { const parts: MultipartUploadPart[] = [] @@ -252,7 +285,7 @@ export class DatalakeClient { partNumber++ } - await this.multipartUploadComplete(ctx, workspace, objectName, multipart, parts) + return await this.multipartUploadComplete(ctx, workspace, objectName, multipart, parts) } catch (err: any) { await this.multipartUploadAbort(ctx, workspace, objectName, multipart) throw err @@ -264,8 +297,8 @@ export class DatalakeClient { workspace: WorkspaceId, objectName: string, stream: Readable | Buffer | string, - metadata: ObjectMetadata - ): Promise { + params: UploadObjectParams + ): Promise { const url = await this.signObjectSign(ctx, workspace, objectName) try { @@ -273,8 +306,8 @@ export class DatalakeClient { body: stream, method: 'PUT', headers: { - 'Content-Type': metadata.type, - 'Content-Length': metadata.size?.toString() ?? '0' + 'Content-Type': params.type, + 'Content-Length': params.size?.toString() ?? '0' // 'x-amz-meta-last-modified': metadata.lastModified.toString() } }) @@ -284,7 +317,7 @@ export class DatalakeClient { throw new DatalakeError('Failed to upload via signed URL') } - await this.signObjectComplete(ctx, workspace, objectName) + return await this.signObjectComplete(ctx, workspace, objectName) } async uploadFromS3 ( @@ -322,10 +355,15 @@ export class DatalakeClient { } } - private async signObjectComplete (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise { + private async signObjectComplete ( + ctx: MeasureContext, + workspace: WorkspaceId, + objectName: string + ): Promise { try { const url = this.getSignObjectUrl(workspace, objectName) - await fetchSafe(ctx, url, { method: 'PUT' }) + const res = await fetchSafe(ctx, url, { method: 'PUT' }) + return (await res.json()) as ObjectMetadata } catch (err: any) { ctx.error('failed to complete signed url upload', { workspace, objectName, err }) throw new DatalakeError('Failed to complete signed URL upload') @@ -353,16 +391,16 @@ export class DatalakeClient { ctx: MeasureContext, workspace: WorkspaceId, objectName: string, - metadata: ObjectMetadata + params: UploadObjectParams ): Promise { const path = `/upload/multipart/${workspace.name}/${encodeURIComponent(objectName)}` const url = concatLink(this.endpoint, path) try { const headers = { - 'Content-Type': metadata.type, - 'Content-Length': metadata.size?.toString() ?? '0', - 'Last-Modified': new Date(metadata.lastModified).toUTCString() + 'Content-Type': params.type, + 'Content-Length': params.size?.toString() ?? '0', + 'Last-Modified': new Date(params.lastModified).toUTCString() } const response = await fetchSafe(ctx, url, { method: 'POST', headers }) return (await response.json()) as MultipartUpload @@ -401,14 +439,15 @@ export class DatalakeClient { objectName: string, multipart: MultipartUpload, parts: MultipartUploadPart[] - ): Promise { + ): Promise { const path = `/upload/multipart/${workspace.name}/${encodeURIComponent(objectName)}/complete` const url = new URL(concatLink(this.endpoint, path)) url.searchParams.append('key', multipart.key) url.searchParams.append('uploadId', multipart.uploadId) try { - await fetchSafe(ctx, url, { method: 'POST', body: JSON.stringify({ parts }) }) + const res = await fetchSafe(ctx, url, { method: 'POST', body: JSON.stringify({ parts }) }) + return (await res.json()) as ObjectMetadata } catch (err: any) { ctx.error('failed to complete multipart upload', { workspace, objectName, err }) throw new DatalakeError('Failed to complete multipart upload') diff --git a/server/datalake/src/index.ts b/server/datalake/src/index.ts index d0f5c87469..3f36b45a90 100644 --- a/server/datalake/src/index.ts +++ b/server/datalake/src/index.ts @@ -18,13 +18,14 @@ import core, { type Blob, type MeasureContext, type Ref, type WorkspaceId, withC import { type BlobStorageIterator, type BucketInfo, + type ListBlobResult, type StorageAdapter, type StorageConfig, type StorageConfiguration, type UploadedObjectInfo } from '@hcengineering/server-core' import { type Readable } from 'stream' -import { type ObjectMetadata, DatalakeClient } from './client' +import { type UploadObjectParams, DatalakeClient } from './client' export { DatalakeClient } @@ -88,8 +89,36 @@ export class DatalakeService implements StorageAdapter { @withContext('listStream') async listStream (ctx: MeasureContext, workspaceId: WorkspaceId): Promise { + let hasMore = true + const buffer: ListBlobResult[] = [] + let cursor: string | undefined + return { - next: async () => [], + next: async () => { + try { + while (hasMore && buffer.length < 50) { + const res = await this.client.listObjects(ctx, workspaceId, cursor) + hasMore = res.cursor !== undefined + cursor = res.cursor + + for (const blob of res.blobs) { + buffer.push({ + _id: blob.name as Ref, + _class: core.class.Blob, + etag: blob.etag, + size: blob.size ?? 0, + provider: this.opt.name, + space: core.space.Configuration, + modifiedBy: core.account.ConfigUser, + modifiedOn: 0 + }) + } + } + } catch (err: any) { + ctx.error('Failed to get list', { error: err, workspaceId: workspaceId.name }) + } + return buffer.splice(0, 50) + }, close: async () => {} } } @@ -131,19 +160,18 @@ export class DatalakeService implements StorageAdapter { contentType: string, size?: number ): Promise { - const metadata: ObjectMetadata = { + const params: UploadObjectParams = { lastModified: Date.now(), - name: objectName, type: contentType, size } - await ctx.with('put', {}, (ctx) => - withRetry(ctx, 5, () => this.client.putObject(ctx, workspaceId, objectName, stream, metadata, size)) + const { etag } = await ctx.with('put', {}, (ctx) => + withRetry(ctx, 5, () => this.client.putObject(ctx, workspaceId, objectName, stream, params)) ) return { - etag: '', + etag, versionId: '' } } diff --git a/workers/datalake/src/blob.ts b/workers/datalake/src/blob.ts index 98cbf414e2..fd4782cb6e 100644 --- a/workers/datalake/src/blob.ts +++ b/workers/datalake/src/blob.ts @@ -23,11 +23,12 @@ import { type BlobRequest, type WorkspaceRequest, type UUID } from './types' import { copyVideo, deleteVideo } from './video' import { type MetricsContext, LoggedCache } from './metrics' -interface BlobMetadata { +export interface BlobMetadata { lastModified: number type: string size: number name: string + etag: string } export function getBlobURL (request: Request, workspace: string, name: string): string { @@ -35,6 +36,28 @@ export function getBlobURL (request: Request, workspace: string, name: string): return new URL(path, request.url).toString() } +export async function handleBlobList ( + request: WorkspaceRequest, + env: Env, + ctx: ExecutionContext, + metrics: MetricsContext +): Promise { + const { workspace } = request + const cursor = extractStrParam(request.query.cursor) + const limit = extractIntParam(request.query.limit) + + const response = await withPostgres(env, ctx, metrics, (db) => { + return db.listBlobs(workspace, cursor, limit) + }) + + const blobs = response.blobs.map((blob) => { + const { name, size, type, hash } = blob + return { name, size, type, etag: hash } + }) + + return json({ blobs, cursor: response.cursor }) +} + export async function handleBlobGet ( request: BlobRequest, env: Env, @@ -68,13 +91,17 @@ export async function handleBlobGet ( return error(404) } - const headers = r2MetadataHeaders(object) + const headers = r2MetadataHeaders(blob.hash, object) if (range !== undefined && object?.range !== undefined) { headers.set('Content-Range', rangeHeader(object.range, object.size)) } const length = object?.range !== undefined && 'length' in object.range ? object?.range?.length : undefined const status = length !== undefined && length < object.size ? 206 : 200 + if (length !== undefined && length < object.size) { + // for partial content use etag returned by R2 + headers.set('ETag', object.httpEtag) + } const response = new Response(object?.body, { headers, status }) @@ -110,7 +137,7 @@ export async function handleBlobHead ( return error(404) } - const headers = r2MetadataHeaders(head) + const headers = r2MetadataHeaders(blob.hash, head) return new Response(null, { headers, status: 200 }) } @@ -204,22 +231,33 @@ export async function saveBlob ( const httpMetadata = { contentType: type, cacheControl, lastModified } const filename = getUniqueFilename() + const blob = await db.getBlob({ workspace, name }) + if (size <= hashLimit) { const [hashStream, uploadStream] = stream.tee() const hash = await getSha256(hashStream) + + // Check if we have the same blob already + if (blob?.hash === hash && blob?.type === type) { + return { type, size, lastModified, name, etag: hash } + } + const data = await db.getData({ hash, location }) if (data !== null) { // Lucky boy, nothing to upload, use existing blob await db.createBlob({ workspace, name, hash, location }) + + return { type, size, lastModified, name, etag: data.hash } } else { await bucket.put(filename, uploadStream, { httpMetadata }) + await db.createData({ hash, location, filename, type, size }) await db.createBlob({ workspace, name, hash, location }) - } - return { type, size, lastModified, name } + return { type, size, lastModified, name, etag: hash } + } } else { // For large files we cannot calculate checksum beforehead // upload file with unique filename and then obtain checksum @@ -229,13 +267,15 @@ export async function saveBlob ( // We found an existing blob with the same hash // we can safely remove the existing blob from storage await Promise.all([bucket.delete(filename), db.createBlob({ workspace, name, hash, location })]) + + return { type, size, lastModified, name, etag: hash } } else { // Otherwise register a new hash and blob await db.createData({ hash, location, filename, type, size }) await db.createBlob({ workspace, name, hash, location }) - } - return { type, size, lastModified, name } + return { type, size, lastModified, name, etag: hash } + } } } @@ -246,7 +286,7 @@ export async function handleBlobUploaded ( workspace: string, name: string, filename: UUID -): Promise { +): Promise { const { location, bucket } = selectStorage(env, workspace) const object = await bucket.head(filename) @@ -255,19 +295,20 @@ export async function handleBlobUploaded ( } const hash = object.checksums.md5 !== undefined ? digestToUUID(object.checksums.md5) : (crypto.randomUUID() as UUID) + const size = object.size + const type = object.httpMetadata?.contentType ?? 'application/octet-stream' await withPostgres(env, ctx, metrics, async (db) => { const data = await db.getData({ hash, location }) if (data !== null) { await Promise.all([bucket.delete(filename), db.createBlob({ workspace, name, hash, location })]) } else { - const size = object.size - const type = object.httpMetadata?.contentType ?? 'application/octet-stream' - await db.createData({ hash, location, filename, type, size }) await db.createBlob({ workspace, name, hash, location }) } }) + + return { type, size, name, etag: hash, lastModified: object.uploaded.getTime() } } async function uploadLargeFile ( @@ -309,7 +350,7 @@ function rangeHeader (range: R2Range, size: number): string { return `bytes ${start}-${end - 1}/${size}` } -function r2MetadataHeaders (head: R2Object): Headers { +function r2MetadataHeaders (hash: string, head: R2Object): Headers { return head.httpMetadata !== undefined ? new Headers({ 'Accept-Ranges': 'bytes', @@ -318,7 +359,7 @@ function r2MetadataHeaders (head: R2Object): Headers { 'Content-Security-Policy': "default-src 'none';", 'Cache-Control': head.httpMetadata.cacheControl ?? cacheControl, 'Last-Modified': head.uploaded.toUTCString(), - ETag: head.httpEtag + ETag: hash }) : new Headers({ 'Accept-Ranges': 'bytes', @@ -326,6 +367,31 @@ function r2MetadataHeaders (head: R2Object): Headers { 'Content-Security-Policy': "default-src 'none';", 'Cache-Control': cacheControl, 'Last-Modified': head.uploaded.toUTCString(), - ETag: head.httpEtag + ETag: hash }) } + +function extractStrParam (value: string | string[] | undefined): string | undefined { + if (Array.isArray(value)) { + return value[0] + } + + return value +} + +function extractIntParam (value: string | string[] | undefined): number | undefined { + if (value === undefined) { + return undefined + } + + if (Array.isArray(value)) { + value = value[0] + } + + const intValue = Number.parseInt(value) + if (Number.isInteger(intValue)) { + return intValue + } + + return undefined +} diff --git a/workers/datalake/src/db.ts b/workers/datalake/src/db.ts index 624bf6780a..be5a03f99e 100644 --- a/workers/datalake/src/db.ts +++ b/workers/datalake/src/db.ts @@ -39,8 +39,11 @@ export interface BlobRecord extends BlobId { deleted: boolean } -export interface BlobRecordWithFilename extends BlobRecord { - filename: string +export type BlobWithDataRecord = BlobRecord & BlobDataRecord + +export interface ListBlobResult { + cursor: string | undefined + blobs: BlobWithDataRecord[] } export async function withPostgres ( @@ -72,7 +75,8 @@ export async function withPostgres ( export interface BlobDB { getData: (dataId: BlobDataId) => Promise createData: (data: BlobDataRecord) => Promise - getBlob: (blobId: BlobId) => Promise + listBlobs: (workspace: string, cursor?: string, limit?: number) => Promise + getBlob: (blobId: BlobId) => Promise createBlob: (blob: Omit) => Promise deleteBlob: (blob: BlobId) => Promise } @@ -99,12 +103,12 @@ export class PostgresDB implements BlobDB { ` } - async getBlob (blobId: BlobId): Promise { + async getBlob (blobId: BlobId): Promise { const { workspace, name } = blobId try { - const rows = await this.sql` - SELECT b.workspace, b.name, b.hash, b.location, b.deleted, d.filename + const rows = await this.sql` + SELECT b.workspace, b.name, b.hash, b.location, b.deleted, d.filename, d.size, d.type FROM blob.blob AS b JOIN blob.data AS d ON b.hash = d.hash AND b.location = d.location WHERE b.workspace = ${workspace} AND b.name = ${name} @@ -120,6 +124,25 @@ export class PostgresDB implements BlobDB { return null } + async listBlobs (workspace: string, cursor?: string, limit?: number): Promise { + cursor = cursor ?? '' + limit = Math.min(limit ?? 100, 1000) + + const rows = await this.sql` + SELECT b.workspace, b.name, b.hash, b.location, b.deleted, d.filename, d.size, d.type + FROM blob.blob AS b + JOIN blob.data AS d ON b.hash = d.hash AND b.location = d.location + WHERE b.workspace = ${workspace} AND b.name > ${cursor} AND b.deleted = false + ORDER BY b.workspace, b.name + LIMIT ${limit} + ` + + return { + cursor: rows.length > 0 ? rows[rows.length - 1].name : undefined, + blobs: rows + } + } + async createBlob (blob: Omit): Promise { const { workspace, name, hash, location } = blob @@ -154,10 +177,14 @@ export class LoggedDB implements BlobDB { await this.ctx.with('db.createData', () => this.db.createData(data)) } - async getBlob (blobId: BlobId): Promise { + async getBlob (blobId: BlobId): Promise { return await this.ctx.with('db.getBlob', () => this.db.getBlob(blobId)) } + async listBlobs (workspace: string, cursor?: string, limit?: number): Promise { + return await this.ctx.with('db.listBlobs', () => this.db.listBlobs(workspace, cursor, limit)) + } + async createBlob (blob: Omit): Promise { await this.ctx.with('db.createBlob', () => this.db.createBlob(blob)) } diff --git a/workers/datalake/src/encodings.ts b/workers/datalake/src/encodings.ts index 7cf3b9347d..9fc707cece 100644 --- a/workers/datalake/src/encodings.ts +++ b/workers/datalake/src/encodings.ts @@ -27,8 +27,6 @@ export const toHex = (buffer: Uint8Array): string => { .join('') } -export const etag = (id: string): string => `"${id}"` - export function formatHexAsUUID (hexString: string): UUID { if (hexString.length !== 32) { throw new Error('Hex string must be exactly 32 characters long.') diff --git a/workers/datalake/src/index.ts b/workers/datalake/src/index.ts index 6857e76aeb..012b547795 100644 --- a/workers/datalake/src/index.ts +++ b/workers/datalake/src/index.ts @@ -16,7 +16,7 @@ import { WorkerEntrypoint } from 'cloudflare:workers' import { type IRequest, type IRequestStrict, type RequestHandler, Router, error, html } from 'itty-router' -import { handleBlobDelete, handleBlobGet, handleBlobHead, handleUploadFormData } from './blob' +import { handleBlobDelete, handleBlobGet, handleBlobHead, handleBlobList, handleUploadFormData } from './blob' import { cors } from './cors' import { LoggedKVNamespace, LoggedR2Bucket, MetricsContext } from './metrics' import { handleImageGet } from './image' @@ -59,6 +59,7 @@ const withBlob: RequestHandler = (request: BlobRequest) => { } router + .get('/blob/:workspace', withWorkspace, handleBlobList) .get('/blob/:workspace/:name', withBlob, handleBlobGet) .get('/blob/:workspace/:name/:filename', withBlob, handleBlobGet) .head('/blob/:workspace/:name', withBlob, handleBlobHead) diff --git a/workers/datalake/src/multipart.ts b/workers/datalake/src/multipart.ts index 92993be97e..6818705cf0 100644 --- a/workers/datalake/src/multipart.ts +++ b/workers/datalake/src/multipart.ts @@ -14,6 +14,7 @@ // import { error, json } from 'itty-router' +import { type BlobMetadata } from './blob' import { withPostgres } from './db' import { cacheControl } from './const' import { toUUID } from './encodings' @@ -119,7 +120,15 @@ export async function handleMultipartUploadComplete ( } }) - return new Response(null, { status: 204 }) + const metadata: BlobMetadata = { + type, + size, + name, + etag: hash, + lastModified: object.uploaded.getTime() + } + + return json(metadata) } export async function handleMultipartUploadAbort ( diff --git a/workers/datalake/src/sign.ts b/workers/datalake/src/sign.ts index be6fcfa8d9..8a68f99b61 100644 --- a/workers/datalake/src/sign.ts +++ b/workers/datalake/src/sign.ts @@ -14,9 +14,9 @@ // import { AwsClient } from 'aws4fetch' -import { error } from 'itty-router' +import { error, json } from 'itty-router' -import { handleBlobUploaded } from './blob' +import { type BlobMetadata, handleBlobUploaded } from './blob' import { type MetricsContext } from './metrics' import { type Storage, selectStorage } from './storage' import { type BlobRequest, type UUID } from './types' @@ -108,8 +108,9 @@ export async function handleSignComplete ( return error(400) } + let metadata: BlobMetadata try { - await handleBlobUploaded(env, ctx, metrics, workspace, name, uuid) + metadata = await handleBlobUploaded(env, ctx, metrics, workspace, name, uuid) } catch (err) { const message = err instanceof Error ? err.message : String(err) console.error({ error: message, workspace, name, uuid }) @@ -118,7 +119,7 @@ export async function handleSignComplete ( await env.datalake_blobs.delete(key) - return new Response(null, { status: 201 }) + return json(metadata) } export async function handleSignAbort (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise {