UBERF-8950 Expose blob list method in datalake (#7484)

Signed-off-by: Alexander Onnikov <Alexander.Onnikov@xored.com>
This commit is contained in:
Alexander Onnikov 2024-12-17 14:44:03 +07:00 committed by GitHub
parent 2ea25fa4ab
commit d81137729e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 234 additions and 65 deletions

View File

@ -25,9 +25,16 @@ export interface ObjectMetadata {
lastModified: number
name: string
type: string
etag: string
size?: number
}
/** @public */
export interface ListObjectOutput {
cursor: string | undefined
blobs: Omit<ObjectMetadata, 'lastModified'>[]
}
/** @public */
export interface StatObjectOutput {
lastModified: number
@ -36,6 +43,13 @@ export interface StatObjectOutput {
size?: number
}
/** @public */
export interface UploadObjectParams {
lastModified: number
type: string
size?: number
}
interface BlobUploadError {
key: string
error: string
@ -68,6 +82,23 @@ export class DatalakeClient {
return concatLink(this.endpoint, path)
}
async listObjects (
ctx: MeasureContext,
workspace: WorkspaceId,
cursor: string | undefined
): Promise<ListObjectOutput> {
const limit = 100
const path = `/blob/${workspace.name}`
const url = new URL(concatLink(this.endpoint, path))
url.searchParams.append('limit', String(limit))
if (cursor !== undefined) {
url.searchParams.append('cursor', cursor)
}
const response = await fetchSafe(ctx, url)
return (await response.json()) as ListObjectOutput
}
async getObject (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise<Readable> {
const url = this.getObjectUrl(ctx, workspace, objectName)
@ -166,9 +197,9 @@ export class DatalakeClient {
workspace: WorkspaceId,
objectName: string,
stream: Readable | Buffer | string,
metadata: ObjectMetadata,
size?: number
): Promise<void> {
params: UploadObjectParams
): Promise<ObjectMetadata> {
let size = params.size
if (size === undefined) {
if (Buffer.isBuffer(stream)) {
size = stream.length
@ -182,12 +213,12 @@ export class DatalakeClient {
try {
if (size === undefined || size < 64 * 1024 * 1024) {
await ctx.with('direct-upload', {}, (ctx) =>
this.uploadWithFormData(ctx, workspace, objectName, stream, metadata)
return await ctx.with('direct-upload', {}, (ctx) =>
this.uploadWithFormData(ctx, workspace, objectName, stream, { ...params, size })
)
} else {
await ctx.with('signed-url-upload', {}, (ctx) =>
this.uploadWithSignedURL(ctx, workspace, objectName, stream, metadata)
return await ctx.with('signed-url-upload', {}, (ctx) =>
this.uploadWithSignedURL(ctx, workspace, objectName, stream, { ...params, size })
)
}
} catch (err) {
@ -201,18 +232,18 @@ export class DatalakeClient {
workspace: WorkspaceId,
objectName: string,
stream: Readable | Buffer | string,
metadata: ObjectMetadata
): Promise<void> {
params: UploadObjectParams
): Promise<ObjectMetadata> {
const path = `/upload/form-data/${workspace.name}`
const url = concatLink(this.endpoint, path)
const form = new FormData()
const options: FormData.AppendOptions = {
filename: objectName,
contentType: metadata.type,
knownLength: metadata.size,
contentType: params.type,
knownLength: params.size,
header: {
'Last-Modified': metadata.lastModified
'Last-Modified': params.lastModified
}
}
form.append('file', stream, options)
@ -229,6 +260,8 @@ export class DatalakeClient {
if ('error' in uploadResult) {
throw new DatalakeError('Upload failed: ' + uploadResult.error)
}
return uploadResult.metadata
}
async uploadMultipart (
@ -236,11 +269,11 @@ export class DatalakeClient {
workspace: WorkspaceId,
objectName: string,
stream: Readable | Buffer | string,
metadata: ObjectMetadata
): Promise<void> {
params: UploadObjectParams
): Promise<ObjectMetadata> {
const chunkSize = 10 * 1024 * 1024
const multipart = await this.multipartUploadStart(ctx, workspace, objectName, metadata)
const multipart = await this.multipartUploadStart(ctx, workspace, objectName, params)
try {
const parts: MultipartUploadPart[] = []
@ -252,7 +285,7 @@ export class DatalakeClient {
partNumber++
}
await this.multipartUploadComplete(ctx, workspace, objectName, multipart, parts)
return await this.multipartUploadComplete(ctx, workspace, objectName, multipart, parts)
} catch (err: any) {
await this.multipartUploadAbort(ctx, workspace, objectName, multipart)
throw err
@ -264,8 +297,8 @@ export class DatalakeClient {
workspace: WorkspaceId,
objectName: string,
stream: Readable | Buffer | string,
metadata: ObjectMetadata
): Promise<void> {
params: UploadObjectParams
): Promise<ObjectMetadata> {
const url = await this.signObjectSign(ctx, workspace, objectName)
try {
@ -273,8 +306,8 @@ export class DatalakeClient {
body: stream,
method: 'PUT',
headers: {
'Content-Type': metadata.type,
'Content-Length': metadata.size?.toString() ?? '0'
'Content-Type': params.type,
'Content-Length': params.size?.toString() ?? '0'
// 'x-amz-meta-last-modified': metadata.lastModified.toString()
}
})
@ -284,7 +317,7 @@ export class DatalakeClient {
throw new DatalakeError('Failed to upload via signed URL')
}
await this.signObjectComplete(ctx, workspace, objectName)
return await this.signObjectComplete(ctx, workspace, objectName)
}
async uploadFromS3 (
@ -322,10 +355,15 @@ export class DatalakeClient {
}
}
private async signObjectComplete (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise<void> {
private async signObjectComplete (
ctx: MeasureContext,
workspace: WorkspaceId,
objectName: string
): Promise<ObjectMetadata> {
try {
const url = this.getSignObjectUrl(workspace, objectName)
await fetchSafe(ctx, url, { method: 'PUT' })
const res = await fetchSafe(ctx, url, { method: 'PUT' })
return (await res.json()) as ObjectMetadata
} catch (err: any) {
ctx.error('failed to complete signed url upload', { workspace, objectName, err })
throw new DatalakeError('Failed to complete signed URL upload')
@ -353,16 +391,16 @@ export class DatalakeClient {
ctx: MeasureContext,
workspace: WorkspaceId,
objectName: string,
metadata: ObjectMetadata
params: UploadObjectParams
): Promise<MultipartUpload> {
const path = `/upload/multipart/${workspace.name}/${encodeURIComponent(objectName)}`
const url = concatLink(this.endpoint, path)
try {
const headers = {
'Content-Type': metadata.type,
'Content-Length': metadata.size?.toString() ?? '0',
'Last-Modified': new Date(metadata.lastModified).toUTCString()
'Content-Type': params.type,
'Content-Length': params.size?.toString() ?? '0',
'Last-Modified': new Date(params.lastModified).toUTCString()
}
const response = await fetchSafe(ctx, url, { method: 'POST', headers })
return (await response.json()) as MultipartUpload
@ -401,14 +439,15 @@ export class DatalakeClient {
objectName: string,
multipart: MultipartUpload,
parts: MultipartUploadPart[]
): Promise<void> {
): Promise<ObjectMetadata> {
const path = `/upload/multipart/${workspace.name}/${encodeURIComponent(objectName)}/complete`
const url = new URL(concatLink(this.endpoint, path))
url.searchParams.append('key', multipart.key)
url.searchParams.append('uploadId', multipart.uploadId)
try {
await fetchSafe(ctx, url, { method: 'POST', body: JSON.stringify({ parts }) })
const res = await fetchSafe(ctx, url, { method: 'POST', body: JSON.stringify({ parts }) })
return (await res.json()) as ObjectMetadata
} catch (err: any) {
ctx.error('failed to complete multipart upload', { workspace, objectName, err })
throw new DatalakeError('Failed to complete multipart upload')

View File

@ -18,13 +18,14 @@ import core, { type Blob, type MeasureContext, type Ref, type WorkspaceId, withC
import {
type BlobStorageIterator,
type BucketInfo,
type ListBlobResult,
type StorageAdapter,
type StorageConfig,
type StorageConfiguration,
type UploadedObjectInfo
} from '@hcengineering/server-core'
import { type Readable } from 'stream'
import { type ObjectMetadata, DatalakeClient } from './client'
import { type UploadObjectParams, DatalakeClient } from './client'
export { DatalakeClient }
@ -88,8 +89,36 @@ export class DatalakeService implements StorageAdapter {
@withContext('listStream')
async listStream (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<BlobStorageIterator> {
let hasMore = true
const buffer: ListBlobResult[] = []
let cursor: string | undefined
return {
next: async () => [],
next: async () => {
try {
while (hasMore && buffer.length < 50) {
const res = await this.client.listObjects(ctx, workspaceId, cursor)
hasMore = res.cursor !== undefined
cursor = res.cursor
for (const blob of res.blobs) {
buffer.push({
_id: blob.name as Ref<Blob>,
_class: core.class.Blob,
etag: blob.etag,
size: blob.size ?? 0,
provider: this.opt.name,
space: core.space.Configuration,
modifiedBy: core.account.ConfigUser,
modifiedOn: 0
})
}
}
} catch (err: any) {
ctx.error('Failed to get list', { error: err, workspaceId: workspaceId.name })
}
return buffer.splice(0, 50)
},
close: async () => {}
}
}
@ -131,19 +160,18 @@ export class DatalakeService implements StorageAdapter {
contentType: string,
size?: number
): Promise<UploadedObjectInfo> {
const metadata: ObjectMetadata = {
const params: UploadObjectParams = {
lastModified: Date.now(),
name: objectName,
type: contentType,
size
}
await ctx.with('put', {}, (ctx) =>
withRetry(ctx, 5, () => this.client.putObject(ctx, workspaceId, objectName, stream, metadata, size))
const { etag } = await ctx.with('put', {}, (ctx) =>
withRetry(ctx, 5, () => this.client.putObject(ctx, workspaceId, objectName, stream, params))
)
return {
etag: '',
etag,
versionId: ''
}
}

View File

@ -23,11 +23,12 @@ import { type BlobRequest, type WorkspaceRequest, type UUID } from './types'
import { copyVideo, deleteVideo } from './video'
import { type MetricsContext, LoggedCache } from './metrics'
interface BlobMetadata {
export interface BlobMetadata {
lastModified: number
type: string
size: number
name: string
etag: string
}
export function getBlobURL (request: Request, workspace: string, name: string): string {
@ -35,6 +36,28 @@ export function getBlobURL (request: Request, workspace: string, name: string):
return new URL(path, request.url).toString()
}
export async function handleBlobList (
request: WorkspaceRequest,
env: Env,
ctx: ExecutionContext,
metrics: MetricsContext
): Promise<Response> {
const { workspace } = request
const cursor = extractStrParam(request.query.cursor)
const limit = extractIntParam(request.query.limit)
const response = await withPostgres(env, ctx, metrics, (db) => {
return db.listBlobs(workspace, cursor, limit)
})
const blobs = response.blobs.map((blob) => {
const { name, size, type, hash } = blob
return { name, size, type, etag: hash }
})
return json({ blobs, cursor: response.cursor })
}
export async function handleBlobGet (
request: BlobRequest,
env: Env,
@ -68,13 +91,17 @@ export async function handleBlobGet (
return error(404)
}
const headers = r2MetadataHeaders(object)
const headers = r2MetadataHeaders(blob.hash, object)
if (range !== undefined && object?.range !== undefined) {
headers.set('Content-Range', rangeHeader(object.range, object.size))
}
const length = object?.range !== undefined && 'length' in object.range ? object?.range?.length : undefined
const status = length !== undefined && length < object.size ? 206 : 200
if (length !== undefined && length < object.size) {
// for partial content use etag returned by R2
headers.set('ETag', object.httpEtag)
}
const response = new Response(object?.body, { headers, status })
@ -110,7 +137,7 @@ export async function handleBlobHead (
return error(404)
}
const headers = r2MetadataHeaders(head)
const headers = r2MetadataHeaders(blob.hash, head)
return new Response(null, { headers, status: 200 })
}
@ -204,22 +231,33 @@ export async function saveBlob (
const httpMetadata = { contentType: type, cacheControl, lastModified }
const filename = getUniqueFilename()
const blob = await db.getBlob({ workspace, name })
if (size <= hashLimit) {
const [hashStream, uploadStream] = stream.tee()
const hash = await getSha256(hashStream)
// Check if we have the same blob already
if (blob?.hash === hash && blob?.type === type) {
return { type, size, lastModified, name, etag: hash }
}
const data = await db.getData({ hash, location })
if (data !== null) {
// Lucky boy, nothing to upload, use existing blob
await db.createBlob({ workspace, name, hash, location })
return { type, size, lastModified, name, etag: data.hash }
} else {
await bucket.put(filename, uploadStream, { httpMetadata })
await db.createData({ hash, location, filename, type, size })
await db.createBlob({ workspace, name, hash, location })
}
return { type, size, lastModified, name }
return { type, size, lastModified, name, etag: hash }
}
} else {
// For large files we cannot calculate checksum beforehead
// upload file with unique filename and then obtain checksum
@ -229,13 +267,15 @@ export async function saveBlob (
// We found an existing blob with the same hash
// we can safely remove the existing blob from storage
await Promise.all([bucket.delete(filename), db.createBlob({ workspace, name, hash, location })])
return { type, size, lastModified, name, etag: hash }
} else {
// Otherwise register a new hash and blob
await db.createData({ hash, location, filename, type, size })
await db.createBlob({ workspace, name, hash, location })
}
return { type, size, lastModified, name }
return { type, size, lastModified, name, etag: hash }
}
}
}
@ -246,7 +286,7 @@ export async function handleBlobUploaded (
workspace: string,
name: string,
filename: UUID
): Promise<void> {
): Promise<BlobMetadata> {
const { location, bucket } = selectStorage(env, workspace)
const object = await bucket.head(filename)
@ -255,19 +295,20 @@ export async function handleBlobUploaded (
}
const hash = object.checksums.md5 !== undefined ? digestToUUID(object.checksums.md5) : (crypto.randomUUID() as UUID)
const size = object.size
const type = object.httpMetadata?.contentType ?? 'application/octet-stream'
await withPostgres(env, ctx, metrics, async (db) => {
const data = await db.getData({ hash, location })
if (data !== null) {
await Promise.all([bucket.delete(filename), db.createBlob({ workspace, name, hash, location })])
} else {
const size = object.size
const type = object.httpMetadata?.contentType ?? 'application/octet-stream'
await db.createData({ hash, location, filename, type, size })
await db.createBlob({ workspace, name, hash, location })
}
})
return { type, size, name, etag: hash, lastModified: object.uploaded.getTime() }
}
async function uploadLargeFile (
@ -309,7 +350,7 @@ function rangeHeader (range: R2Range, size: number): string {
return `bytes ${start}-${end - 1}/${size}`
}
function r2MetadataHeaders (head: R2Object): Headers {
function r2MetadataHeaders (hash: string, head: R2Object): Headers {
return head.httpMetadata !== undefined
? new Headers({
'Accept-Ranges': 'bytes',
@ -318,7 +359,7 @@ function r2MetadataHeaders (head: R2Object): Headers {
'Content-Security-Policy': "default-src 'none';",
'Cache-Control': head.httpMetadata.cacheControl ?? cacheControl,
'Last-Modified': head.uploaded.toUTCString(),
ETag: head.httpEtag
ETag: hash
})
: new Headers({
'Accept-Ranges': 'bytes',
@ -326,6 +367,31 @@ function r2MetadataHeaders (head: R2Object): Headers {
'Content-Security-Policy': "default-src 'none';",
'Cache-Control': cacheControl,
'Last-Modified': head.uploaded.toUTCString(),
ETag: head.httpEtag
ETag: hash
})
}
function extractStrParam (value: string | string[] | undefined): string | undefined {
if (Array.isArray(value)) {
return value[0]
}
return value
}
function extractIntParam (value: string | string[] | undefined): number | undefined {
if (value === undefined) {
return undefined
}
if (Array.isArray(value)) {
value = value[0]
}
const intValue = Number.parseInt(value)
if (Number.isInteger(intValue)) {
return intValue
}
return undefined
}

View File

@ -39,8 +39,11 @@ export interface BlobRecord extends BlobId {
deleted: boolean
}
export interface BlobRecordWithFilename extends BlobRecord {
filename: string
export type BlobWithDataRecord = BlobRecord & BlobDataRecord
export interface ListBlobResult {
cursor: string | undefined
blobs: BlobWithDataRecord[]
}
export async function withPostgres<T> (
@ -72,7 +75,8 @@ export async function withPostgres<T> (
export interface BlobDB {
getData: (dataId: BlobDataId) => Promise<BlobDataRecord | null>
createData: (data: BlobDataRecord) => Promise<void>
getBlob: (blobId: BlobId) => Promise<BlobRecordWithFilename | null>
listBlobs: (workspace: string, cursor?: string, limit?: number) => Promise<ListBlobResult>
getBlob: (blobId: BlobId) => Promise<BlobWithDataRecord | null>
createBlob: (blob: Omit<BlobRecord, 'filename' | 'deleted'>) => Promise<void>
deleteBlob: (blob: BlobId) => Promise<void>
}
@ -99,12 +103,12 @@ export class PostgresDB implements BlobDB {
`
}
async getBlob (blobId: BlobId): Promise<BlobRecordWithFilename | null> {
async getBlob (blobId: BlobId): Promise<BlobWithDataRecord | null> {
const { workspace, name } = blobId
try {
const rows = await this.sql<BlobRecordWithFilename[]>`
SELECT b.workspace, b.name, b.hash, b.location, b.deleted, d.filename
const rows = await this.sql<BlobWithDataRecord[]>`
SELECT b.workspace, b.name, b.hash, b.location, b.deleted, d.filename, d.size, d.type
FROM blob.blob AS b
JOIN blob.data AS d ON b.hash = d.hash AND b.location = d.location
WHERE b.workspace = ${workspace} AND b.name = ${name}
@ -120,6 +124,25 @@ export class PostgresDB implements BlobDB {
return null
}
async listBlobs (workspace: string, cursor?: string, limit?: number): Promise<ListBlobResult> {
cursor = cursor ?? ''
limit = Math.min(limit ?? 100, 1000)
const rows = await this.sql<BlobWithDataRecord[]>`
SELECT b.workspace, b.name, b.hash, b.location, b.deleted, d.filename, d.size, d.type
FROM blob.blob AS b
JOIN blob.data AS d ON b.hash = d.hash AND b.location = d.location
WHERE b.workspace = ${workspace} AND b.name > ${cursor} AND b.deleted = false
ORDER BY b.workspace, b.name
LIMIT ${limit}
`
return {
cursor: rows.length > 0 ? rows[rows.length - 1].name : undefined,
blobs: rows
}
}
async createBlob (blob: Omit<BlobRecord, 'filename' | 'deleted'>): Promise<void> {
const { workspace, name, hash, location } = blob
@ -154,10 +177,14 @@ export class LoggedDB implements BlobDB {
await this.ctx.with('db.createData', () => this.db.createData(data))
}
async getBlob (blobId: BlobId): Promise<BlobRecordWithFilename | null> {
async getBlob (blobId: BlobId): Promise<BlobWithDataRecord | null> {
return await this.ctx.with('db.getBlob', () => this.db.getBlob(blobId))
}
async listBlobs (workspace: string, cursor?: string, limit?: number): Promise<ListBlobResult> {
return await this.ctx.with('db.listBlobs', () => this.db.listBlobs(workspace, cursor, limit))
}
async createBlob (blob: Omit<BlobRecord, 'filename' | 'deleted'>): Promise<void> {
await this.ctx.with('db.createBlob', () => this.db.createBlob(blob))
}

View File

@ -27,8 +27,6 @@ export const toHex = (buffer: Uint8Array): string => {
.join('')
}
export const etag = (id: string): string => `"${id}"`
export function formatHexAsUUID (hexString: string): UUID {
if (hexString.length !== 32) {
throw new Error('Hex string must be exactly 32 characters long.')

View File

@ -16,7 +16,7 @@
import { WorkerEntrypoint } from 'cloudflare:workers'
import { type IRequest, type IRequestStrict, type RequestHandler, Router, error, html } from 'itty-router'
import { handleBlobDelete, handleBlobGet, handleBlobHead, handleUploadFormData } from './blob'
import { handleBlobDelete, handleBlobGet, handleBlobHead, handleBlobList, handleUploadFormData } from './blob'
import { cors } from './cors'
import { LoggedKVNamespace, LoggedR2Bucket, MetricsContext } from './metrics'
import { handleImageGet } from './image'
@ -59,6 +59,7 @@ const withBlob: RequestHandler<BlobRequest> = (request: BlobRequest) => {
}
router
.get('/blob/:workspace', withWorkspace, handleBlobList)
.get('/blob/:workspace/:name', withBlob, handleBlobGet)
.get('/blob/:workspace/:name/:filename', withBlob, handleBlobGet)
.head('/blob/:workspace/:name', withBlob, handleBlobHead)

View File

@ -14,6 +14,7 @@
//
import { error, json } from 'itty-router'
import { type BlobMetadata } from './blob'
import { withPostgres } from './db'
import { cacheControl } from './const'
import { toUUID } from './encodings'
@ -119,7 +120,15 @@ export async function handleMultipartUploadComplete (
}
})
return new Response(null, { status: 204 })
const metadata: BlobMetadata = {
type,
size,
name,
etag: hash,
lastModified: object.uploaded.getTime()
}
return json(metadata)
}
export async function handleMultipartUploadAbort (

View File

@ -14,9 +14,9 @@
//
import { AwsClient } from 'aws4fetch'
import { error } from 'itty-router'
import { error, json } from 'itty-router'
import { handleBlobUploaded } from './blob'
import { type BlobMetadata, handleBlobUploaded } from './blob'
import { type MetricsContext } from './metrics'
import { type Storage, selectStorage } from './storage'
import { type BlobRequest, type UUID } from './types'
@ -108,8 +108,9 @@ export async function handleSignComplete (
return error(400)
}
let metadata: BlobMetadata
try {
await handleBlobUploaded(env, ctx, metrics, workspace, name, uuid)
metadata = await handleBlobUploaded(env, ctx, metrics, workspace, name, uuid)
} catch (err) {
const message = err instanceof Error ? err.message : String(err)
console.error({ error: message, workspace, name, uuid })
@ -118,7 +119,7 @@ export async function handleSignComplete (
await env.datalake_blobs.delete(key)
return new Response(null, { status: 201 })
return json(metadata)
}
export async function handleSignAbort (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise<Response> {