diff --git a/workers/datalake/src/blob.ts b/workers/datalake/src/blob.ts index 7f118e3a90..3fd6fcd16f 100644 --- a/workers/datalake/src/blob.ts +++ b/workers/datalake/src/blob.ts @@ -14,14 +14,15 @@ // import { error, json } from 'itty-router' -import postgres from 'postgres' -import * as db from './db' +import { type Sql } from 'postgres' +import db, { withPostgres } from './db' import { cacheControl, hashLimit } from './const' import { toUUID } from './encodings' import { getSha256 } from './hash' import { selectStorage } from './storage' import { type BlobRequest, type WorkspaceRequest, type UUID } from './types' import { copyVideo, deleteVideo } from './video' +import { measure, LoggedCache } from './measure' interface BlobMetadata { lastModified: number @@ -38,20 +39,22 @@ export function getBlobURL (request: Request, workspace: string, name: string): export async function handleBlobGet (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise { const { workspace, name } = request - const sql = postgres(env.HYPERDRIVE.connectionString) - const { bucket } = selectStorage(env, workspace) - - const blob = await db.getBlob(sql, { workspace, name }) - if (blob === null || blob.deleted) { - return error(404) - } - - const cache = caches.default + const cache = new LoggedCache(caches.default) const cached = await cache.match(request) if (cached !== undefined) { + console.log({ message: 'cache hit' }) return cached } + const { bucket } = selectStorage(env, workspace) + + const blob = await withPostgres(env, ctx, (sql) => { + return db.getBlob(sql, { workspace, name }) + }) + if (blob === null || blob.deleted) { + return error(404) + } + const range = request.headers.has('Range') ? request.headers : undefined const object = await bucket.get(blob.filename, { range }) if (object === null) { @@ -67,6 +70,7 @@ export async function handleBlobGet (request: BlobRequest, env: Env, ctx: Execut const status = length !== undefined && length < object.size ? 206 : 200 const response = new Response(object?.body, { headers, status }) + if (response.status === 200) { ctx.waitUntil(cache.put(request, response.clone())) } @@ -77,10 +81,11 @@ export async function handleBlobGet (request: BlobRequest, env: Env, ctx: Execut export async function handleBlobHead (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise { const { workspace, name } = request - const sql = postgres(env.HYPERDRIVE.connectionString) const { bucket } = selectStorage(env, workspace) - const blob = await db.getBlob(sql, { workspace, name }) + const blob = await withPostgres(env, ctx, (sql) => { + return db.getBlob(sql, { workspace, name }) + }) if (blob === null || blob.deleted) { return error(404) } @@ -97,10 +102,10 @@ export async function handleBlobHead (request: BlobRequest, env: Env, ctx: Execu export async function handleBlobDelete (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise { const { workspace, name } = request - const sql = postgres(env.HYPERDRIVE.connectionString) - try { - await Promise.all([db.deleteBlob(sql, { workspace, name }), deleteVideo(env, workspace, name)]) + await withPostgres(env, ctx, (sql) => { + return Promise.all([db.deleteBlob(sql, { workspace, name }), deleteVideo(env, workspace, name)]) + }) return new Response(null, { status: 204 }) } catch (err: any) { @@ -110,7 +115,11 @@ export async function handleBlobDelete (request: BlobRequest, env: Env, ctx: Exe } } -export async function handleUploadFormData (request: WorkspaceRequest, env: Env): Promise { +export async function handleUploadFormData ( + request: WorkspaceRequest, + env: Env, + ctx: ExecutionContext +): Promise { const contentType = request.headers.get('Content-Type') if (contentType === null || !contentType.includes('multipart/form-data')) { console.error({ error: 'expected multipart/form-data' }) @@ -119,11 +128,9 @@ export async function handleUploadFormData (request: WorkspaceRequest, env: Env) const { workspace } = request - const sql = postgres(env.HYPERDRIVE.connectionString) - let formData: FormData try { - formData = await request.formData() + formData = await measure('fetch formdata', () => request.formData()) } catch (err: any) { const message = err instanceof Error ? err.message : String(err) console.error({ error: 'failed to parse form data', message }) @@ -139,7 +146,9 @@ export async function handleUploadFormData (request: WorkspaceRequest, env: Env) files.map(async ([file, key]) => { const { name, type, lastModified } = file try { - const metadata = await saveBlob(env, sql, file.stream(), file.size, type, workspace, name, lastModified) + const metadata = await withPostgres(env, ctx, (sql) => { + return saveBlob(env, sql, file.stream(), file.size, type, workspace, name, lastModified) + }) // TODO this probably should happen via queue, let it be here for now if (type.startsWith('video/')) { @@ -161,7 +170,7 @@ export async function handleUploadFormData (request: WorkspaceRequest, env: Env) export async function saveBlob ( env: Env, - sql: postgres.Sql, + sql: Sql, stream: ReadableStream, size: number, type: string, @@ -179,6 +188,7 @@ export async function saveBlob ( const hash = await getSha256(hashStream) const data = await db.getData(sql, { hash, location }) + if (data !== null) { // Lucky boy, nothing to upload, use existing blob await db.createBlob(sql, { workspace, name, hash, location }) @@ -212,8 +222,13 @@ export async function saveBlob ( } } -export async function handleBlobUploaded (env: Env, workspace: string, name: string, filename: UUID): Promise { - const sql = postgres(env.HYPERDRIVE.connectionString) +export async function handleBlobUploaded ( + env: Env, + ctx: ExecutionContext, + workspace: string, + name: string, + filename: UUID +): Promise { const { location, bucket } = selectStorage(env, workspace) const object = await bucket.head(filename) @@ -223,16 +238,20 @@ export async function handleBlobUploaded (env: Env, workspace: string, name: str const hash = object.checksums.md5 !== undefined ? digestToUUID(object.checksums.md5) : (crypto.randomUUID() as UUID) - const data = await db.getData(sql, { hash, location }) - if (data !== null) { - await Promise.all([bucket.delete(filename), db.createBlob(sql, { workspace, name, hash, location })]) - } else { - const size = object.size - const type = object.httpMetadata.contentType ?? 'application/octet-stream' + await withPostgres(env, ctx, async (sql) => { + const data = await db.getData(sql, { hash, location }) + if (data !== null) { + await Promise.all([bucket.delete(filename), db.createBlob(sql, { workspace, name, hash, location })]) + } else { + const size = object.size + const type = object.httpMetadata?.contentType ?? 'application/octet-stream' - await db.createData(sql, { hash, location, filename, type, size }) - await db.createBlob(sql, { workspace, name, hash, location }) - } + await sql.begin((sql) => [ + db.createData(sql, { hash, location, filename, type, size }), + db.createBlob(sql, { workspace, name, hash, location }) + ]) + } + }) } async function uploadLargeFile ( diff --git a/workers/datalake/src/db.ts b/workers/datalake/src/db.ts index 725c21c62c..6aad3e3be8 100644 --- a/workers/datalake/src/db.ts +++ b/workers/datalake/src/db.ts @@ -13,7 +13,8 @@ // limitations under the License. // -import type postgres from 'postgres' +import postgres from 'postgres' +import { measure, measureSync } from './measure' import { type Location, type UUID } from './types' export interface BlobDataId { @@ -42,63 +43,94 @@ export interface BlobRecordWithFilename extends BlobRecord { filename: string } -export async function getData (sql: postgres.Sql, dataId: BlobDataId): Promise { - const { hash, location } = dataId - - const rows = await sql` - SELECT hash, location, filename, size, type - FROM blob.data - WHERE hash = ${hash} AND location = ${location} - ` - - if (rows.length > 0) { - return rows[0] +export async function withPostgres ( + env: Env, + ctx: ExecutionContext, + fn: (sql: postgres.Sql) => Promise +): Promise { + const sql = measureSync('db.connect', () => { + return postgres(env.HYPERDRIVE.connectionString) + }) + try { + return await fn(sql) + } finally { + measureSync('db.close', () => { + ctx.waitUntil(sql.end({ timeout: 0 })) + }) } - - return null } -export async function createData (sql: postgres.Sql, data: BlobDataRecord): Promise { - const { hash, location, filename, size, type } = data - - await sql` - UPSERT INTO blob.data (hash, location, filename, size, type) - VALUES (${hash}, ${location}, ${filename}, ${size}, ${type}) - ` +export interface BlobDB { + getData: (sql: postgres.Sql, dataId: BlobDataId) => Promise + createData: (sql: postgres.Sql, data: BlobDataRecord) => Promise + getBlob: (sql: postgres.Sql, blobId: BlobId) => Promise + createBlob: (sql: postgres.Sql, blob: Omit) => Promise + deleteBlob: (sql: postgres.Sql, blob: BlobId) => Promise } -export async function getBlob (sql: postgres.Sql, blobId: BlobId): Promise { - const { workspace, name } = blobId +const db: BlobDB = { + async getData (sql: postgres.Sql, dataId: BlobDataId): Promise { + const { hash, location } = dataId + const rows = await sql` + SELECT hash, location, filename, size, type + FROM blob.data + WHERE hash = ${hash} AND location = ${location} + ` + return rows.length > 0 ? rows[0] : null + }, - const rows = await sql` - SELECT b.workspace, b.name, b.hash, b.location, b.deleted, d.filename - FROM blob.blob AS b - JOIN blob.data AS d ON b.hash = d.hash AND b.location = d.location - WHERE b.workspace = ${workspace} AND b.name = ${name} - ` + async createData (sql: postgres.Sql, data: BlobDataRecord): Promise { + const { hash, location, filename, size, type } = data - if (rows.length > 0) { - return rows[0] + await sql` + UPSERT INTO blob.data (hash, location, filename, size, type) + VALUES (${hash}, ${location}, ${filename}, ${size}, ${type}) + ` + }, + + async getBlob (sql: postgres.Sql, blobId: BlobId): Promise { + const { workspace, name } = blobId + + const rows = await sql` + SELECT b.workspace, b.name, b.hash, b.location, b.deleted, d.filename + FROM blob.blob AS b + JOIN blob.data AS d ON b.hash = d.hash AND b.location = d.location + WHERE b.workspace = ${workspace} AND b.name = ${name} + ` + + if (rows.length > 0) { + return rows[0] + } + + return null + }, + + async createBlob (sql: postgres.Sql, blob: Omit): Promise { + const { workspace, name, hash, location } = blob + + await sql` + UPSERT INTO blob.blob (workspace, name, hash, location, deleted) + VALUES (${workspace}, ${name}, ${hash}, ${location}, false) + ` + }, + + async deleteBlob (sql: postgres.Sql, blob: BlobId): Promise { + const { workspace, name } = blob + + await sql` + UPDATE blob.blob + SET deleted = true + WHERE workspace = ${workspace} AND name = ${name} + ` } - - return null } -export async function createBlob (sql: postgres.Sql, blob: Omit): Promise { - const { workspace, name, hash, location } = blob - - await sql` - UPSERT INTO blob.blob (workspace, name, hash, location, deleted) - VALUES (${workspace}, ${name}, ${hash}, ${location}, false) - ` +export const measuredDb: BlobDB = { + getData: (sql, dataId) => measure('db.getData', () => db.getData(sql, dataId)), + createData: (sql, data) => measure('db.createData', () => db.createData(sql, data)), + getBlob: (sql, blobId) => measure('db.getBlob', () => db.getBlob(sql, blobId)), + createBlob: (sql, blob) => measure('db.createBlob', () => db.createBlob(sql, blob)), + deleteBlob: (sql, blob) => measure('db.deleteBlob', () => db.deleteBlob(sql, blob)) } -export async function deleteBlob (sql: postgres.Sql, blob: BlobId): Promise { - const { workspace, name } = blob - - await sql` - UPDATE blob.blob - SET deleted = true - WHERE workspace = ${workspace} AND name = ${name} - ` -} +export default measuredDb diff --git a/workers/datalake/src/index.ts b/workers/datalake/src/index.ts index d61b35e70f..4dcf33df06 100644 --- a/workers/datalake/src/index.ts +++ b/workers/datalake/src/index.ts @@ -18,6 +18,7 @@ import { type IRequestStrict, type RequestHandler, Router, error, html } from 'i import { handleBlobDelete, handleBlobGet, handleBlobHead, handleUploadFormData } from './blob' import { cors } from './cors' +import { LoggedKVNamespace, LoggedR2Bucket, requestTimeAfter, requestTimeBefore } from './measure' import { handleImageGet } from './image' import { handleS3Blob } from './s3' import { handleVideoMetaGet } from './video' @@ -35,8 +36,8 @@ const { preflight, corsify } = cors({ }) const router = Router({ - before: [preflight], - finally: [corsify] + before: [preflight, requestTimeBefore], + finally: [corsify, requestTimeAfter] }) const withWorkspace: RequestHandler = (request: WorkspaceRequest) => { @@ -87,6 +88,19 @@ router .all('*', () => error(404)) export default class DatalakeWorker extends WorkerEntrypoint { + constructor (ctx: ExecutionContext, env: Env) { + env = { + ...env, + datalake_blobs: new LoggedKVNamespace(env.datalake_blobs), + DATALAKE_APAC: new LoggedR2Bucket(env.DATALAKE_APAC), + DATALAKE_EEUR: new LoggedR2Bucket(env.DATALAKE_EEUR), + DATALAKE_WEUR: new LoggedR2Bucket(env.DATALAKE_WEUR), + DATALAKE_ENAM: new LoggedR2Bucket(env.DATALAKE_ENAM), + DATALAKE_WNAM: new LoggedR2Bucket(env.DATALAKE_WNAM) + } + super(ctx, env) + } + async fetch (request: Request): Promise { return await router.fetch(request, this.env, this.ctx).catch(error) } diff --git a/workers/datalake/src/measure.ts b/workers/datalake/src/measure.ts new file mode 100644 index 0000000000..385f3b44e8 --- /dev/null +++ b/workers/datalake/src/measure.ts @@ -0,0 +1,177 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the 'License'); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an 'AS IS' BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { type IRequest, type ResponseHandler, type RequestHandler } from 'itty-router' + +export async function measure (label: string, fn: () => Promise): Promise { + const start = performance.now() + try { + return await fn() + } finally { + const duration = performance.now() - start + console.log({ stage: label, duration }) + } +} + +export function measureSync (label: string, fn: () => T): T { + const start = performance.now() + try { + return fn() + } finally { + const duration = performance.now() - start + console.log({ stage: label, duration }) + } +} + +export const requestTimeBefore: RequestHandler = async (request: IRequest) => { + request.startTime = performance.now() +} + +export const requestTimeAfter: ResponseHandler = async (response: Response, request: IRequest) => { + const duration = performance.now() - request.startTime + console.log({ stage: 'total', duration }) +} + +export class LoggedR2Bucket implements R2Bucket { + constructor (private readonly bucket: R2Bucket) {} + + async head (key: string): Promise { + return await measure('r2.head', () => this.bucket.head(key)) + } + + async get ( + key: string, + options?: R2GetOptions & { + onlyIf?: R2Conditional | Headers + } + ): Promise { + return await measure('r2.get', () => this.bucket.get(key, options)) + } + + async put ( + key: string, + value: ReadableStream | ArrayBuffer | ArrayBufferView | string | null | Blob, + options?: R2PutOptions & { + onlyIf?: R2Conditional | Headers + } + ): Promise { + return await measure('r2.put', () => this.bucket.put(key, value, options)) + } + + async createMultipartUpload (key: string, options?: R2MultipartOptions): Promise { + return await measure('r2.createMultipartUpload', () => this.bucket.createMultipartUpload(key, options)) + } + + resumeMultipartUpload (key: string, uploadId: string): R2MultipartUpload { + return measureSync('r2.resumeMultipartUpload', () => this.bucket.resumeMultipartUpload(key, uploadId)) + } + + async delete (keys: string | string[]): Promise { + await measure('r2.delete', () => this.bucket.delete(keys)) + } + + async list (options?: R2ListOptions): Promise { + return await measure('r2.list', () => this.bucket.list(options)) + } +} + +export class LoggedKVNamespace implements KVNamespace { + constructor (private readonly kv: KVNamespace) {} + + get (key: string, options?: Partial>): Promise + get (key: string, type: 'text'): Promise + get(key: string, type: 'json'): Promise + get (key: string, type: 'arrayBuffer'): Promise + get (key: string, type: 'stream'): Promise + get (key: string, options?: KVNamespaceGetOptions<'text'>): Promise + get(key: string, options?: KVNamespaceGetOptions<'json'>): Promise + get (key: string, options?: KVNamespaceGetOptions<'arrayBuffer'>): Promise + get (key: string, options?: KVNamespaceGetOptions<'stream'>): Promise + async get (key: string, options?: any): Promise { + return await measure('kv.get', () => this.kv.get(key, options)) + } + + getWithMetadata( + key: string, + options?: Partial> + ): Promise> + getWithMetadata( + key: string, + type: 'text' + ): Promise> + getWithMetadata( + key: string, + type: 'json' + ): Promise> + getWithMetadata( + key: string, + type: 'arrayBuffer' + ): Promise> + getWithMetadata( + key: string, + type: 'stream' + ): Promise> + getWithMetadata( + key: string, + options?: KVNamespaceGetOptions<'text'> + ): Promise> + getWithMetadata( + key: string, + options?: KVNamespaceGetOptions<'json'> + ): Promise> + getWithMetadata( + key: string, + options?: KVNamespaceGetOptions<'arrayBuffer'> + ): Promise> + getWithMetadata( + key: string, + options?: KVNamespaceGetOptions<'stream'> + ): Promise> + async getWithMetadata (key: string, options?: any): Promise { + return await measure('kv.getWithMetadata', () => this.kv.getWithMetadata(key, options)) + } + + async list(options?: KVNamespaceListOptions): Promise> { + return await measure('kv.list', () => this.kv.list(options)) + } + + async put ( + key: string, + value: string | ArrayBuffer | ArrayBufferView | ReadableStream, + options?: KVNamespacePutOptions + ): Promise { + await measure('kv.put', () => this.kv.put(key, value)) + } + + async delete (key: string): Promise { + await measure('kv.delete', () => this.kv.delete(key)) + } +} + +export class LoggedCache implements Cache { + constructor (private readonly cache: Cache) {} + + async match (request: RequestInfo, options?: CacheQueryOptions): Promise { + return await measure('cache.match', () => this.cache.match(request, options)) + } + + async delete (request: RequestInfo, options?: CacheQueryOptions): Promise { + return await measure('cache.delete', () => this.cache.delete(request, options)) + } + + async put (request: RequestInfo, response: Response): Promise { + await measure('cache.put', () => this.cache.put(request, response)) + } +} diff --git a/workers/datalake/src/multipart.ts b/workers/datalake/src/multipart.ts index c842bbe522..6b5558a816 100644 --- a/workers/datalake/src/multipart.ts +++ b/workers/datalake/src/multipart.ts @@ -14,8 +14,7 @@ // import { error, json } from 'itty-router' -import postgres from 'postgres' -import * as db from './db' +import db, { withPostgres } from './db' import { cacheControl } from './const' import { toUUID } from './encodings' import { selectStorage } from './storage' @@ -85,8 +84,6 @@ export async function handleMultipartUploadComplete ( env: Env, ctx: ExecutionContext ): Promise { - const sql = postgres(env.HYPERDRIVE.connectionString) - const { workspace, name } = request const multipartKey = request.query?.key @@ -108,17 +105,19 @@ export async function handleMultipartUploadComplete ( const size = object.size ?? 0 const filename = multipartKey as UUID - const data = await db.getData(sql, { hash, location }) - if (data !== null) { - // blob already exists - await Promise.all([bucket.delete(filename), db.createBlob(sql, { workspace, name, hash, location })]) - } else { - // Otherwise register a new hash and blob - await sql.begin((sql) => [ - db.createData(sql, { hash, location, filename, type, size }), - db.createBlob(sql, { workspace, name, hash, location }) - ]) - } + await withPostgres(env, ctx, async (sql) => { + const data = await db.getData(sql, { hash, location }) + if (data !== null) { + // blob already exists + await Promise.all([bucket.delete(filename), db.createBlob(sql, { workspace, name, hash, location })]) + } else { + // Otherwise register a new hash and blob + await sql.begin((sql) => [ + db.createData(sql, { hash, location, filename, type, size }), + db.createBlob(sql, { workspace, name, hash, location }) + ]) + } + }) return new Response(null, { status: 204 }) } diff --git a/workers/datalake/src/s3.ts b/workers/datalake/src/s3.ts index 0c8f4f46ac..062b7714e3 100644 --- a/workers/datalake/src/s3.ts +++ b/workers/datalake/src/s3.ts @@ -15,8 +15,7 @@ import { AwsClient } from 'aws4fetch' import { error, json } from 'itty-router' -import postgres from 'postgres' -import * as db from './db' +import db, { withPostgres } from './db' import { saveBlob } from './blob' import { type BlobRequest } from './types' @@ -38,34 +37,35 @@ function getS3Client (payload: S3UploadPayload): AwsClient { export async function handleS3Blob (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise { const { workspace, name } = request - const sql = postgres(env.HYPERDRIVE.connectionString) const payload = await request.json() const client = getS3Client(payload) - // Ensure the blob does not exist - const blob = await db.getBlob(sql, { workspace, name }) - if (blob !== null) { - return new Response(null, { status: 200 }) - } + return await withPostgres(env, ctx, async (sql) => { + // Ensure the blob does not exist + const blob = await db.getBlob(sql, { workspace, name }) + if (blob !== null) { + return new Response(null, { status: 200 }) + } - const object = await client.fetch(payload.url) - if (!object.ok || object.status !== 200) { - return error(object.status) - } + const object = await client.fetch(payload.url) + if (!object.ok || object.status !== 200) { + return error(object.status) + } - if (object.body === null) { - return error(400) - } + if (object.body === null) { + return error(400) + } - const contentType = object.headers.get('content-type') ?? 'application/octet-stream' - const contentLengthHeader = object.headers.get('content-length') ?? '0' - const lastModifiedHeader = object.headers.get('last-modified') + const contentType = object.headers.get('content-type') ?? 'application/octet-stream' + const contentLengthHeader = object.headers.get('content-length') ?? '0' + const lastModifiedHeader = object.headers.get('last-modified') - const contentLength = Number.parseInt(contentLengthHeader) - const lastModified = lastModifiedHeader !== null ? new Date(lastModifiedHeader).getTime() : Date.now() + const contentLength = Number.parseInt(contentLengthHeader) + const lastModified = lastModifiedHeader !== null ? new Date(lastModifiedHeader).getTime() : Date.now() - const result = await saveBlob(env, sql, object.body, contentLength, contentType, workspace, name, lastModified) - return json(result) + const result = await saveBlob(env, sql, object.body, contentLength, contentType, workspace, name, lastModified) + return json(result) + }) } diff --git a/workers/datalake/src/sign.ts b/workers/datalake/src/sign.ts index 28f97f8f52..1ac7c6206c 100644 --- a/workers/datalake/src/sign.ts +++ b/workers/datalake/src/sign.ts @@ -96,7 +96,7 @@ export async function handleSignComplete (request: BlobRequest, env: Env, ctx: E } try { - await handleBlobUploaded(env, workspace, name, uuid) + await handleBlobUploaded(env, ctx, workspace, name, uuid) } catch (err) { const message = err instanceof Error ? err.message : String(err) console.error({ error: message, workspace, name, uuid }) diff --git a/workers/datalake/wrangler.toml b/workers/datalake/wrangler.toml index f578e1f574..0b6989c4b0 100644 --- a/workers/datalake/wrangler.toml +++ b/workers/datalake/wrangler.toml @@ -1,7 +1,7 @@ #:schema node_modules/wrangler/config-schema.json name = "datalake-worker" main = "src/index.ts" -compatibility_date = "2024-07-01" +compatibility_date = "2024-09-23" compatibility_flags = ["nodejs_compat"] keep_vars = true