mirror of
https://github.com/hcengineering/platform.git
synced 2024-12-19 00:41:47 +03:00
UBERF-8842 Add performance logging to datalake (#7395)
Signed-off-by: Alexander Onnikov <Alexander.Onnikov@xored.com>
This commit is contained in:
parent
05a54a29eb
commit
bb24943d45
@ -14,14 +14,15 @@
|
|||||||
//
|
//
|
||||||
|
|
||||||
import { error, json } from 'itty-router'
|
import { error, json } from 'itty-router'
|
||||||
import postgres from 'postgres'
|
import { type Sql } from 'postgres'
|
||||||
import * as db from './db'
|
import db, { withPostgres } from './db'
|
||||||
import { cacheControl, hashLimit } from './const'
|
import { cacheControl, hashLimit } from './const'
|
||||||
import { toUUID } from './encodings'
|
import { toUUID } from './encodings'
|
||||||
import { getSha256 } from './hash'
|
import { getSha256 } from './hash'
|
||||||
import { selectStorage } from './storage'
|
import { selectStorage } from './storage'
|
||||||
import { type BlobRequest, type WorkspaceRequest, type UUID } from './types'
|
import { type BlobRequest, type WorkspaceRequest, type UUID } from './types'
|
||||||
import { copyVideo, deleteVideo } from './video'
|
import { copyVideo, deleteVideo } from './video'
|
||||||
|
import { measure, LoggedCache } from './measure'
|
||||||
|
|
||||||
interface BlobMetadata {
|
interface BlobMetadata {
|
||||||
lastModified: number
|
lastModified: number
|
||||||
@ -38,20 +39,22 @@ export function getBlobURL (request: Request, workspace: string, name: string):
|
|||||||
export async function handleBlobGet (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise<Response> {
|
export async function handleBlobGet (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise<Response> {
|
||||||
const { workspace, name } = request
|
const { workspace, name } = request
|
||||||
|
|
||||||
const sql = postgres(env.HYPERDRIVE.connectionString)
|
const cache = new LoggedCache(caches.default)
|
||||||
const { bucket } = selectStorage(env, workspace)
|
|
||||||
|
|
||||||
const blob = await db.getBlob(sql, { workspace, name })
|
|
||||||
if (blob === null || blob.deleted) {
|
|
||||||
return error(404)
|
|
||||||
}
|
|
||||||
|
|
||||||
const cache = caches.default
|
|
||||||
const cached = await cache.match(request)
|
const cached = await cache.match(request)
|
||||||
if (cached !== undefined) {
|
if (cached !== undefined) {
|
||||||
|
console.log({ message: 'cache hit' })
|
||||||
return cached
|
return cached
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const { bucket } = selectStorage(env, workspace)
|
||||||
|
|
||||||
|
const blob = await withPostgres(env, ctx, (sql) => {
|
||||||
|
return db.getBlob(sql, { workspace, name })
|
||||||
|
})
|
||||||
|
if (blob === null || blob.deleted) {
|
||||||
|
return error(404)
|
||||||
|
}
|
||||||
|
|
||||||
const range = request.headers.has('Range') ? request.headers : undefined
|
const range = request.headers.has('Range') ? request.headers : undefined
|
||||||
const object = await bucket.get(blob.filename, { range })
|
const object = await bucket.get(blob.filename, { range })
|
||||||
if (object === null) {
|
if (object === null) {
|
||||||
@ -67,6 +70,7 @@ export async function handleBlobGet (request: BlobRequest, env: Env, ctx: Execut
|
|||||||
const status = length !== undefined && length < object.size ? 206 : 200
|
const status = length !== undefined && length < object.size ? 206 : 200
|
||||||
|
|
||||||
const response = new Response(object?.body, { headers, status })
|
const response = new Response(object?.body, { headers, status })
|
||||||
|
|
||||||
if (response.status === 200) {
|
if (response.status === 200) {
|
||||||
ctx.waitUntil(cache.put(request, response.clone()))
|
ctx.waitUntil(cache.put(request, response.clone()))
|
||||||
}
|
}
|
||||||
@ -77,10 +81,11 @@ export async function handleBlobGet (request: BlobRequest, env: Env, ctx: Execut
|
|||||||
export async function handleBlobHead (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise<Response> {
|
export async function handleBlobHead (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise<Response> {
|
||||||
const { workspace, name } = request
|
const { workspace, name } = request
|
||||||
|
|
||||||
const sql = postgres(env.HYPERDRIVE.connectionString)
|
|
||||||
const { bucket } = selectStorage(env, workspace)
|
const { bucket } = selectStorage(env, workspace)
|
||||||
|
|
||||||
const blob = await db.getBlob(sql, { workspace, name })
|
const blob = await withPostgres(env, ctx, (sql) => {
|
||||||
|
return db.getBlob(sql, { workspace, name })
|
||||||
|
})
|
||||||
if (blob === null || blob.deleted) {
|
if (blob === null || blob.deleted) {
|
||||||
return error(404)
|
return error(404)
|
||||||
}
|
}
|
||||||
@ -97,10 +102,10 @@ export async function handleBlobHead (request: BlobRequest, env: Env, ctx: Execu
|
|||||||
export async function handleBlobDelete (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise<Response> {
|
export async function handleBlobDelete (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise<Response> {
|
||||||
const { workspace, name } = request
|
const { workspace, name } = request
|
||||||
|
|
||||||
const sql = postgres(env.HYPERDRIVE.connectionString)
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await Promise.all([db.deleteBlob(sql, { workspace, name }), deleteVideo(env, workspace, name)])
|
await withPostgres(env, ctx, (sql) => {
|
||||||
|
return Promise.all([db.deleteBlob(sql, { workspace, name }), deleteVideo(env, workspace, name)])
|
||||||
|
})
|
||||||
|
|
||||||
return new Response(null, { status: 204 })
|
return new Response(null, { status: 204 })
|
||||||
} catch (err: any) {
|
} catch (err: any) {
|
||||||
@ -110,7 +115,11 @@ export async function handleBlobDelete (request: BlobRequest, env: Env, ctx: Exe
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function handleUploadFormData (request: WorkspaceRequest, env: Env): Promise<Response> {
|
export async function handleUploadFormData (
|
||||||
|
request: WorkspaceRequest,
|
||||||
|
env: Env,
|
||||||
|
ctx: ExecutionContext
|
||||||
|
): Promise<Response> {
|
||||||
const contentType = request.headers.get('Content-Type')
|
const contentType = request.headers.get('Content-Type')
|
||||||
if (contentType === null || !contentType.includes('multipart/form-data')) {
|
if (contentType === null || !contentType.includes('multipart/form-data')) {
|
||||||
console.error({ error: 'expected multipart/form-data' })
|
console.error({ error: 'expected multipart/form-data' })
|
||||||
@ -119,11 +128,9 @@ export async function handleUploadFormData (request: WorkspaceRequest, env: Env)
|
|||||||
|
|
||||||
const { workspace } = request
|
const { workspace } = request
|
||||||
|
|
||||||
const sql = postgres(env.HYPERDRIVE.connectionString)
|
|
||||||
|
|
||||||
let formData: FormData
|
let formData: FormData
|
||||||
try {
|
try {
|
||||||
formData = await request.formData()
|
formData = await measure('fetch formdata', () => request.formData())
|
||||||
} catch (err: any) {
|
} catch (err: any) {
|
||||||
const message = err instanceof Error ? err.message : String(err)
|
const message = err instanceof Error ? err.message : String(err)
|
||||||
console.error({ error: 'failed to parse form data', message })
|
console.error({ error: 'failed to parse form data', message })
|
||||||
@ -139,7 +146,9 @@ export async function handleUploadFormData (request: WorkspaceRequest, env: Env)
|
|||||||
files.map(async ([file, key]) => {
|
files.map(async ([file, key]) => {
|
||||||
const { name, type, lastModified } = file
|
const { name, type, lastModified } = file
|
||||||
try {
|
try {
|
||||||
const metadata = await saveBlob(env, sql, file.stream(), file.size, type, workspace, name, lastModified)
|
const metadata = await withPostgres(env, ctx, (sql) => {
|
||||||
|
return saveBlob(env, sql, file.stream(), file.size, type, workspace, name, lastModified)
|
||||||
|
})
|
||||||
|
|
||||||
// TODO this probably should happen via queue, let it be here for now
|
// TODO this probably should happen via queue, let it be here for now
|
||||||
if (type.startsWith('video/')) {
|
if (type.startsWith('video/')) {
|
||||||
@ -161,7 +170,7 @@ export async function handleUploadFormData (request: WorkspaceRequest, env: Env)
|
|||||||
|
|
||||||
export async function saveBlob (
|
export async function saveBlob (
|
||||||
env: Env,
|
env: Env,
|
||||||
sql: postgres.Sql,
|
sql: Sql,
|
||||||
stream: ReadableStream,
|
stream: ReadableStream,
|
||||||
size: number,
|
size: number,
|
||||||
type: string,
|
type: string,
|
||||||
@ -179,6 +188,7 @@ export async function saveBlob (
|
|||||||
|
|
||||||
const hash = await getSha256(hashStream)
|
const hash = await getSha256(hashStream)
|
||||||
const data = await db.getData(sql, { hash, location })
|
const data = await db.getData(sql, { hash, location })
|
||||||
|
|
||||||
if (data !== null) {
|
if (data !== null) {
|
||||||
// Lucky boy, nothing to upload, use existing blob
|
// Lucky boy, nothing to upload, use existing blob
|
||||||
await db.createBlob(sql, { workspace, name, hash, location })
|
await db.createBlob(sql, { workspace, name, hash, location })
|
||||||
@ -212,8 +222,13 @@ export async function saveBlob (
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function handleBlobUploaded (env: Env, workspace: string, name: string, filename: UUID): Promise<void> {
|
export async function handleBlobUploaded (
|
||||||
const sql = postgres(env.HYPERDRIVE.connectionString)
|
env: Env,
|
||||||
|
ctx: ExecutionContext,
|
||||||
|
workspace: string,
|
||||||
|
name: string,
|
||||||
|
filename: UUID
|
||||||
|
): Promise<void> {
|
||||||
const { location, bucket } = selectStorage(env, workspace)
|
const { location, bucket } = selectStorage(env, workspace)
|
||||||
|
|
||||||
const object = await bucket.head(filename)
|
const object = await bucket.head(filename)
|
||||||
@ -223,16 +238,20 @@ export async function handleBlobUploaded (env: Env, workspace: string, name: str
|
|||||||
|
|
||||||
const hash = object.checksums.md5 !== undefined ? digestToUUID(object.checksums.md5) : (crypto.randomUUID() as UUID)
|
const hash = object.checksums.md5 !== undefined ? digestToUUID(object.checksums.md5) : (crypto.randomUUID() as UUID)
|
||||||
|
|
||||||
const data = await db.getData(sql, { hash, location })
|
await withPostgres(env, ctx, async (sql) => {
|
||||||
if (data !== null) {
|
const data = await db.getData(sql, { hash, location })
|
||||||
await Promise.all([bucket.delete(filename), db.createBlob(sql, { workspace, name, hash, location })])
|
if (data !== null) {
|
||||||
} else {
|
await Promise.all([bucket.delete(filename), db.createBlob(sql, { workspace, name, hash, location })])
|
||||||
const size = object.size
|
} else {
|
||||||
const type = object.httpMetadata.contentType ?? 'application/octet-stream'
|
const size = object.size
|
||||||
|
const type = object.httpMetadata?.contentType ?? 'application/octet-stream'
|
||||||
|
|
||||||
await db.createData(sql, { hash, location, filename, type, size })
|
await sql.begin((sql) => [
|
||||||
await db.createBlob(sql, { workspace, name, hash, location })
|
db.createData(sql, { hash, location, filename, type, size }),
|
||||||
}
|
db.createBlob(sql, { workspace, name, hash, location })
|
||||||
|
])
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async function uploadLargeFile (
|
async function uploadLargeFile (
|
||||||
|
@ -13,7 +13,8 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
//
|
//
|
||||||
|
|
||||||
import type postgres from 'postgres'
|
import postgres from 'postgres'
|
||||||
|
import { measure, measureSync } from './measure'
|
||||||
import { type Location, type UUID } from './types'
|
import { type Location, type UUID } from './types'
|
||||||
|
|
||||||
export interface BlobDataId {
|
export interface BlobDataId {
|
||||||
@ -42,63 +43,94 @@ export interface BlobRecordWithFilename extends BlobRecord {
|
|||||||
filename: string
|
filename: string
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function getData (sql: postgres.Sql, dataId: BlobDataId): Promise<BlobDataRecord | null> {
|
export async function withPostgres<T> (
|
||||||
const { hash, location } = dataId
|
env: Env,
|
||||||
|
ctx: ExecutionContext,
|
||||||
const rows = await sql<BlobDataRecord[]>`
|
fn: (sql: postgres.Sql) => Promise<T>
|
||||||
SELECT hash, location, filename, size, type
|
): Promise<T> {
|
||||||
FROM blob.data
|
const sql = measureSync('db.connect', () => {
|
||||||
WHERE hash = ${hash} AND location = ${location}
|
return postgres(env.HYPERDRIVE.connectionString)
|
||||||
`
|
})
|
||||||
|
try {
|
||||||
if (rows.length > 0) {
|
return await fn(sql)
|
||||||
return rows[0]
|
} finally {
|
||||||
|
measureSync('db.close', () => {
|
||||||
|
ctx.waitUntil(sql.end({ timeout: 0 }))
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
return null
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function createData (sql: postgres.Sql, data: BlobDataRecord): Promise<void> {
|
export interface BlobDB {
|
||||||
const { hash, location, filename, size, type } = data
|
getData: (sql: postgres.Sql, dataId: BlobDataId) => Promise<BlobDataRecord | null>
|
||||||
|
createData: (sql: postgres.Sql, data: BlobDataRecord) => Promise<void>
|
||||||
await sql`
|
getBlob: (sql: postgres.Sql, blobId: BlobId) => Promise<BlobRecordWithFilename | null>
|
||||||
UPSERT INTO blob.data (hash, location, filename, size, type)
|
createBlob: (sql: postgres.Sql, blob: Omit<BlobRecord, 'filename' | 'deleted'>) => Promise<void>
|
||||||
VALUES (${hash}, ${location}, ${filename}, ${size}, ${type})
|
deleteBlob: (sql: postgres.Sql, blob: BlobId) => Promise<void>
|
||||||
`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function getBlob (sql: postgres.Sql, blobId: BlobId): Promise<BlobRecordWithFilename | null> {
|
const db: BlobDB = {
|
||||||
const { workspace, name } = blobId
|
async getData (sql: postgres.Sql, dataId: BlobDataId): Promise<BlobDataRecord | null> {
|
||||||
|
const { hash, location } = dataId
|
||||||
|
const rows = await sql<BlobDataRecord[]>`
|
||||||
|
SELECT hash, location, filename, size, type
|
||||||
|
FROM blob.data
|
||||||
|
WHERE hash = ${hash} AND location = ${location}
|
||||||
|
`
|
||||||
|
return rows.length > 0 ? rows[0] : null
|
||||||
|
},
|
||||||
|
|
||||||
const rows = await sql<BlobRecordWithFilename[]>`
|
async createData (sql: postgres.Sql, data: BlobDataRecord): Promise<void> {
|
||||||
SELECT b.workspace, b.name, b.hash, b.location, b.deleted, d.filename
|
const { hash, location, filename, size, type } = data
|
||||||
FROM blob.blob AS b
|
|
||||||
JOIN blob.data AS d ON b.hash = d.hash AND b.location = d.location
|
|
||||||
WHERE b.workspace = ${workspace} AND b.name = ${name}
|
|
||||||
`
|
|
||||||
|
|
||||||
if (rows.length > 0) {
|
await sql`
|
||||||
return rows[0]
|
UPSERT INTO blob.data (hash, location, filename, size, type)
|
||||||
|
VALUES (${hash}, ${location}, ${filename}, ${size}, ${type})
|
||||||
|
`
|
||||||
|
},
|
||||||
|
|
||||||
|
async getBlob (sql: postgres.Sql, blobId: BlobId): Promise<BlobRecordWithFilename | null> {
|
||||||
|
const { workspace, name } = blobId
|
||||||
|
|
||||||
|
const rows = await sql<BlobRecordWithFilename[]>`
|
||||||
|
SELECT b.workspace, b.name, b.hash, b.location, b.deleted, d.filename
|
||||||
|
FROM blob.blob AS b
|
||||||
|
JOIN blob.data AS d ON b.hash = d.hash AND b.location = d.location
|
||||||
|
WHERE b.workspace = ${workspace} AND b.name = ${name}
|
||||||
|
`
|
||||||
|
|
||||||
|
if (rows.length > 0) {
|
||||||
|
return rows[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
return null
|
||||||
|
},
|
||||||
|
|
||||||
|
async createBlob (sql: postgres.Sql, blob: Omit<BlobRecord, 'filename' | 'deleted'>): Promise<void> {
|
||||||
|
const { workspace, name, hash, location } = blob
|
||||||
|
|
||||||
|
await sql`
|
||||||
|
UPSERT INTO blob.blob (workspace, name, hash, location, deleted)
|
||||||
|
VALUES (${workspace}, ${name}, ${hash}, ${location}, false)
|
||||||
|
`
|
||||||
|
},
|
||||||
|
|
||||||
|
async deleteBlob (sql: postgres.Sql, blob: BlobId): Promise<void> {
|
||||||
|
const { workspace, name } = blob
|
||||||
|
|
||||||
|
await sql`
|
||||||
|
UPDATE blob.blob
|
||||||
|
SET deleted = true
|
||||||
|
WHERE workspace = ${workspace} AND name = ${name}
|
||||||
|
`
|
||||||
}
|
}
|
||||||
|
|
||||||
return null
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function createBlob (sql: postgres.Sql, blob: Omit<BlobRecord, 'filename' | 'deleted'>): Promise<void> {
|
export const measuredDb: BlobDB = {
|
||||||
const { workspace, name, hash, location } = blob
|
getData: (sql, dataId) => measure('db.getData', () => db.getData(sql, dataId)),
|
||||||
|
createData: (sql, data) => measure('db.createData', () => db.createData(sql, data)),
|
||||||
await sql`
|
getBlob: (sql, blobId) => measure('db.getBlob', () => db.getBlob(sql, blobId)),
|
||||||
UPSERT INTO blob.blob (workspace, name, hash, location, deleted)
|
createBlob: (sql, blob) => measure('db.createBlob', () => db.createBlob(sql, blob)),
|
||||||
VALUES (${workspace}, ${name}, ${hash}, ${location}, false)
|
deleteBlob: (sql, blob) => measure('db.deleteBlob', () => db.deleteBlob(sql, blob))
|
||||||
`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function deleteBlob (sql: postgres.Sql, blob: BlobId): Promise<void> {
|
export default measuredDb
|
||||||
const { workspace, name } = blob
|
|
||||||
|
|
||||||
await sql`
|
|
||||||
UPDATE blob.blob
|
|
||||||
SET deleted = true
|
|
||||||
WHERE workspace = ${workspace} AND name = ${name}
|
|
||||||
`
|
|
||||||
}
|
|
||||||
|
@ -18,6 +18,7 @@ import { type IRequestStrict, type RequestHandler, Router, error, html } from 'i
|
|||||||
|
|
||||||
import { handleBlobDelete, handleBlobGet, handleBlobHead, handleUploadFormData } from './blob'
|
import { handleBlobDelete, handleBlobGet, handleBlobHead, handleUploadFormData } from './blob'
|
||||||
import { cors } from './cors'
|
import { cors } from './cors'
|
||||||
|
import { LoggedKVNamespace, LoggedR2Bucket, requestTimeAfter, requestTimeBefore } from './measure'
|
||||||
import { handleImageGet } from './image'
|
import { handleImageGet } from './image'
|
||||||
import { handleS3Blob } from './s3'
|
import { handleS3Blob } from './s3'
|
||||||
import { handleVideoMetaGet } from './video'
|
import { handleVideoMetaGet } from './video'
|
||||||
@ -35,8 +36,8 @@ const { preflight, corsify } = cors({
|
|||||||
})
|
})
|
||||||
|
|
||||||
const router = Router<IRequestStrict, [Env, ExecutionContext], Response>({
|
const router = Router<IRequestStrict, [Env, ExecutionContext], Response>({
|
||||||
before: [preflight],
|
before: [preflight, requestTimeBefore],
|
||||||
finally: [corsify]
|
finally: [corsify, requestTimeAfter]
|
||||||
})
|
})
|
||||||
|
|
||||||
const withWorkspace: RequestHandler<WorkspaceRequest> = (request: WorkspaceRequest) => {
|
const withWorkspace: RequestHandler<WorkspaceRequest> = (request: WorkspaceRequest) => {
|
||||||
@ -87,6 +88,19 @@ router
|
|||||||
.all('*', () => error(404))
|
.all('*', () => error(404))
|
||||||
|
|
||||||
export default class DatalakeWorker extends WorkerEntrypoint<Env> {
|
export default class DatalakeWorker extends WorkerEntrypoint<Env> {
|
||||||
|
constructor (ctx: ExecutionContext, env: Env) {
|
||||||
|
env = {
|
||||||
|
...env,
|
||||||
|
datalake_blobs: new LoggedKVNamespace(env.datalake_blobs),
|
||||||
|
DATALAKE_APAC: new LoggedR2Bucket(env.DATALAKE_APAC),
|
||||||
|
DATALAKE_EEUR: new LoggedR2Bucket(env.DATALAKE_EEUR),
|
||||||
|
DATALAKE_WEUR: new LoggedR2Bucket(env.DATALAKE_WEUR),
|
||||||
|
DATALAKE_ENAM: new LoggedR2Bucket(env.DATALAKE_ENAM),
|
||||||
|
DATALAKE_WNAM: new LoggedR2Bucket(env.DATALAKE_WNAM)
|
||||||
|
}
|
||||||
|
super(ctx, env)
|
||||||
|
}
|
||||||
|
|
||||||
async fetch (request: Request): Promise<Response> {
|
async fetch (request: Request): Promise<Response> {
|
||||||
return await router.fetch(request, this.env, this.ctx).catch(error)
|
return await router.fetch(request, this.env, this.ctx).catch(error)
|
||||||
}
|
}
|
||||||
|
177
workers/datalake/src/measure.ts
Normal file
177
workers/datalake/src/measure.ts
Normal file
@ -0,0 +1,177 @@
|
|||||||
|
//
|
||||||
|
// Copyright © 2024 Hardcore Engineering Inc.
|
||||||
|
//
|
||||||
|
// Licensed under the Eclipse Public License, Version 2.0 (the 'License');
|
||||||
|
// you may not use this file except in compliance with the License. You may
|
||||||
|
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an 'AS IS' BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
//
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
|
||||||
|
import { type IRequest, type ResponseHandler, type RequestHandler } from 'itty-router'
|
||||||
|
|
||||||
|
export async function measure<T> (label: string, fn: () => Promise<T>): Promise<T> {
|
||||||
|
const start = performance.now()
|
||||||
|
try {
|
||||||
|
return await fn()
|
||||||
|
} finally {
|
||||||
|
const duration = performance.now() - start
|
||||||
|
console.log({ stage: label, duration })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export function measureSync<T> (label: string, fn: () => T): T {
|
||||||
|
const start = performance.now()
|
||||||
|
try {
|
||||||
|
return fn()
|
||||||
|
} finally {
|
||||||
|
const duration = performance.now() - start
|
||||||
|
console.log({ stage: label, duration })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export const requestTimeBefore: RequestHandler<IRequest> = async (request: IRequest) => {
|
||||||
|
request.startTime = performance.now()
|
||||||
|
}
|
||||||
|
|
||||||
|
export const requestTimeAfter: ResponseHandler<Response> = async (response: Response, request: IRequest) => {
|
||||||
|
const duration = performance.now() - request.startTime
|
||||||
|
console.log({ stage: 'total', duration })
|
||||||
|
}
|
||||||
|
|
||||||
|
export class LoggedR2Bucket implements R2Bucket {
|
||||||
|
constructor (private readonly bucket: R2Bucket) {}
|
||||||
|
|
||||||
|
async head (key: string): Promise<R2Object | null> {
|
||||||
|
return await measure('r2.head', () => this.bucket.head(key))
|
||||||
|
}
|
||||||
|
|
||||||
|
async get (
|
||||||
|
key: string,
|
||||||
|
options?: R2GetOptions & {
|
||||||
|
onlyIf?: R2Conditional | Headers
|
||||||
|
}
|
||||||
|
): Promise<R2ObjectBody | null> {
|
||||||
|
return await measure('r2.get', () => this.bucket.get(key, options))
|
||||||
|
}
|
||||||
|
|
||||||
|
async put (
|
||||||
|
key: string,
|
||||||
|
value: ReadableStream | ArrayBuffer | ArrayBufferView | string | null | Blob,
|
||||||
|
options?: R2PutOptions & {
|
||||||
|
onlyIf?: R2Conditional | Headers
|
||||||
|
}
|
||||||
|
): Promise<R2Object> {
|
||||||
|
return await measure('r2.put', () => this.bucket.put(key, value, options))
|
||||||
|
}
|
||||||
|
|
||||||
|
async createMultipartUpload (key: string, options?: R2MultipartOptions): Promise<R2MultipartUpload> {
|
||||||
|
return await measure('r2.createMultipartUpload', () => this.bucket.createMultipartUpload(key, options))
|
||||||
|
}
|
||||||
|
|
||||||
|
resumeMultipartUpload (key: string, uploadId: string): R2MultipartUpload {
|
||||||
|
return measureSync('r2.resumeMultipartUpload', () => this.bucket.resumeMultipartUpload(key, uploadId))
|
||||||
|
}
|
||||||
|
|
||||||
|
async delete (keys: string | string[]): Promise<void> {
|
||||||
|
await measure('r2.delete', () => this.bucket.delete(keys))
|
||||||
|
}
|
||||||
|
|
||||||
|
async list (options?: R2ListOptions): Promise<R2Objects> {
|
||||||
|
return await measure('r2.list', () => this.bucket.list(options))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export class LoggedKVNamespace implements KVNamespace {
|
||||||
|
constructor (private readonly kv: KVNamespace) {}
|
||||||
|
|
||||||
|
get (key: string, options?: Partial<KVNamespaceGetOptions<undefined>>): Promise<string | null>
|
||||||
|
get (key: string, type: 'text'): Promise<string | null>
|
||||||
|
get<ExpectedValue = unknown>(key: string, type: 'json'): Promise<ExpectedValue | null>
|
||||||
|
get (key: string, type: 'arrayBuffer'): Promise<ArrayBuffer | null>
|
||||||
|
get (key: string, type: 'stream'): Promise<ReadableStream | null>
|
||||||
|
get (key: string, options?: KVNamespaceGetOptions<'text'>): Promise<string | null>
|
||||||
|
get<ExpectedValue = unknown>(key: string, options?: KVNamespaceGetOptions<'json'>): Promise<ExpectedValue | null>
|
||||||
|
get (key: string, options?: KVNamespaceGetOptions<'arrayBuffer'>): Promise<ArrayBuffer | null>
|
||||||
|
get (key: string, options?: KVNamespaceGetOptions<'stream'>): Promise<ReadableStream | null>
|
||||||
|
async get (key: string, options?: any): Promise<any> {
|
||||||
|
return await measure('kv.get', () => this.kv.get(key, options))
|
||||||
|
}
|
||||||
|
|
||||||
|
getWithMetadata<Metadata = unknown>(
|
||||||
|
key: string,
|
||||||
|
options?: Partial<KVNamespaceGetOptions<undefined>>
|
||||||
|
): Promise<KVNamespaceGetWithMetadataResult<string, Metadata>>
|
||||||
|
getWithMetadata<Metadata = unknown>(
|
||||||
|
key: string,
|
||||||
|
type: 'text'
|
||||||
|
): Promise<KVNamespaceGetWithMetadataResult<string, Metadata>>
|
||||||
|
getWithMetadata<ExpectedValue = unknown, Metadata = unknown>(
|
||||||
|
key: string,
|
||||||
|
type: 'json'
|
||||||
|
): Promise<KVNamespaceGetWithMetadataResult<ExpectedValue, Metadata>>
|
||||||
|
getWithMetadata<Metadata = unknown>(
|
||||||
|
key: string,
|
||||||
|
type: 'arrayBuffer'
|
||||||
|
): Promise<KVNamespaceGetWithMetadataResult<ArrayBuffer, Metadata>>
|
||||||
|
getWithMetadata<Metadata = unknown>(
|
||||||
|
key: string,
|
||||||
|
type: 'stream'
|
||||||
|
): Promise<KVNamespaceGetWithMetadataResult<ReadableStream, Metadata>>
|
||||||
|
getWithMetadata<Metadata = unknown>(
|
||||||
|
key: string,
|
||||||
|
options?: KVNamespaceGetOptions<'text'>
|
||||||
|
): Promise<KVNamespaceGetWithMetadataResult<string, Metadata>>
|
||||||
|
getWithMetadata<ExpectedValue = unknown, Metadata = unknown>(
|
||||||
|
key: string,
|
||||||
|
options?: KVNamespaceGetOptions<'json'>
|
||||||
|
): Promise<KVNamespaceGetWithMetadataResult<ExpectedValue, Metadata>>
|
||||||
|
getWithMetadata<Metadata = unknown>(
|
||||||
|
key: string,
|
||||||
|
options?: KVNamespaceGetOptions<'arrayBuffer'>
|
||||||
|
): Promise<KVNamespaceGetWithMetadataResult<ArrayBuffer, Metadata>>
|
||||||
|
getWithMetadata<Metadata = unknown>(
|
||||||
|
key: string,
|
||||||
|
options?: KVNamespaceGetOptions<'stream'>
|
||||||
|
): Promise<KVNamespaceGetWithMetadataResult<ReadableStream, Metadata>>
|
||||||
|
async getWithMetadata (key: string, options?: any): Promise<any> {
|
||||||
|
return await measure('kv.getWithMetadata', () => this.kv.getWithMetadata(key, options))
|
||||||
|
}
|
||||||
|
|
||||||
|
async list<Metadata = unknown>(options?: KVNamespaceListOptions): Promise<KVNamespaceListResult<Metadata, string>> {
|
||||||
|
return await measure('kv.list', () => this.kv.list(options))
|
||||||
|
}
|
||||||
|
|
||||||
|
async put (
|
||||||
|
key: string,
|
||||||
|
value: string | ArrayBuffer | ArrayBufferView | ReadableStream,
|
||||||
|
options?: KVNamespacePutOptions
|
||||||
|
): Promise<void> {
|
||||||
|
await measure('kv.put', () => this.kv.put(key, value))
|
||||||
|
}
|
||||||
|
|
||||||
|
async delete (key: string): Promise<void> {
|
||||||
|
await measure('kv.delete', () => this.kv.delete(key))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export class LoggedCache implements Cache {
|
||||||
|
constructor (private readonly cache: Cache) {}
|
||||||
|
|
||||||
|
async match (request: RequestInfo, options?: CacheQueryOptions): Promise<Response | undefined> {
|
||||||
|
return await measure('cache.match', () => this.cache.match(request, options))
|
||||||
|
}
|
||||||
|
|
||||||
|
async delete (request: RequestInfo, options?: CacheQueryOptions): Promise<boolean> {
|
||||||
|
return await measure('cache.delete', () => this.cache.delete(request, options))
|
||||||
|
}
|
||||||
|
|
||||||
|
async put (request: RequestInfo, response: Response): Promise<void> {
|
||||||
|
await measure('cache.put', () => this.cache.put(request, response))
|
||||||
|
}
|
||||||
|
}
|
@ -14,8 +14,7 @@
|
|||||||
//
|
//
|
||||||
|
|
||||||
import { error, json } from 'itty-router'
|
import { error, json } from 'itty-router'
|
||||||
import postgres from 'postgres'
|
import db, { withPostgres } from './db'
|
||||||
import * as db from './db'
|
|
||||||
import { cacheControl } from './const'
|
import { cacheControl } from './const'
|
||||||
import { toUUID } from './encodings'
|
import { toUUID } from './encodings'
|
||||||
import { selectStorage } from './storage'
|
import { selectStorage } from './storage'
|
||||||
@ -85,8 +84,6 @@ export async function handleMultipartUploadComplete (
|
|||||||
env: Env,
|
env: Env,
|
||||||
ctx: ExecutionContext
|
ctx: ExecutionContext
|
||||||
): Promise<Response> {
|
): Promise<Response> {
|
||||||
const sql = postgres(env.HYPERDRIVE.connectionString)
|
|
||||||
|
|
||||||
const { workspace, name } = request
|
const { workspace, name } = request
|
||||||
|
|
||||||
const multipartKey = request.query?.key
|
const multipartKey = request.query?.key
|
||||||
@ -108,17 +105,19 @@ export async function handleMultipartUploadComplete (
|
|||||||
const size = object.size ?? 0
|
const size = object.size ?? 0
|
||||||
const filename = multipartKey as UUID
|
const filename = multipartKey as UUID
|
||||||
|
|
||||||
const data = await db.getData(sql, { hash, location })
|
await withPostgres(env, ctx, async (sql) => {
|
||||||
if (data !== null) {
|
const data = await db.getData(sql, { hash, location })
|
||||||
// blob already exists
|
if (data !== null) {
|
||||||
await Promise.all([bucket.delete(filename), db.createBlob(sql, { workspace, name, hash, location })])
|
// blob already exists
|
||||||
} else {
|
await Promise.all([bucket.delete(filename), db.createBlob(sql, { workspace, name, hash, location })])
|
||||||
// Otherwise register a new hash and blob
|
} else {
|
||||||
await sql.begin((sql) => [
|
// Otherwise register a new hash and blob
|
||||||
db.createData(sql, { hash, location, filename, type, size }),
|
await sql.begin((sql) => [
|
||||||
db.createBlob(sql, { workspace, name, hash, location })
|
db.createData(sql, { hash, location, filename, type, size }),
|
||||||
])
|
db.createBlob(sql, { workspace, name, hash, location })
|
||||||
}
|
])
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
return new Response(null, { status: 204 })
|
return new Response(null, { status: 204 })
|
||||||
}
|
}
|
||||||
|
@ -15,8 +15,7 @@
|
|||||||
|
|
||||||
import { AwsClient } from 'aws4fetch'
|
import { AwsClient } from 'aws4fetch'
|
||||||
import { error, json } from 'itty-router'
|
import { error, json } from 'itty-router'
|
||||||
import postgres from 'postgres'
|
import db, { withPostgres } from './db'
|
||||||
import * as db from './db'
|
|
||||||
import { saveBlob } from './blob'
|
import { saveBlob } from './blob'
|
||||||
import { type BlobRequest } from './types'
|
import { type BlobRequest } from './types'
|
||||||
|
|
||||||
@ -38,34 +37,35 @@ function getS3Client (payload: S3UploadPayload): AwsClient {
|
|||||||
|
|
||||||
export async function handleS3Blob (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise<Response> {
|
export async function handleS3Blob (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise<Response> {
|
||||||
const { workspace, name } = request
|
const { workspace, name } = request
|
||||||
const sql = postgres(env.HYPERDRIVE.connectionString)
|
|
||||||
|
|
||||||
const payload = await request.json<S3UploadPayload>()
|
const payload = await request.json<S3UploadPayload>()
|
||||||
|
|
||||||
const client = getS3Client(payload)
|
const client = getS3Client(payload)
|
||||||
|
|
||||||
// Ensure the blob does not exist
|
return await withPostgres(env, ctx, async (sql) => {
|
||||||
const blob = await db.getBlob(sql, { workspace, name })
|
// Ensure the blob does not exist
|
||||||
if (blob !== null) {
|
const blob = await db.getBlob(sql, { workspace, name })
|
||||||
return new Response(null, { status: 200 })
|
if (blob !== null) {
|
||||||
}
|
return new Response(null, { status: 200 })
|
||||||
|
}
|
||||||
|
|
||||||
const object = await client.fetch(payload.url)
|
const object = await client.fetch(payload.url)
|
||||||
if (!object.ok || object.status !== 200) {
|
if (!object.ok || object.status !== 200) {
|
||||||
return error(object.status)
|
return error(object.status)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (object.body === null) {
|
if (object.body === null) {
|
||||||
return error(400)
|
return error(400)
|
||||||
}
|
}
|
||||||
|
|
||||||
const contentType = object.headers.get('content-type') ?? 'application/octet-stream'
|
const contentType = object.headers.get('content-type') ?? 'application/octet-stream'
|
||||||
const contentLengthHeader = object.headers.get('content-length') ?? '0'
|
const contentLengthHeader = object.headers.get('content-length') ?? '0'
|
||||||
const lastModifiedHeader = object.headers.get('last-modified')
|
const lastModifiedHeader = object.headers.get('last-modified')
|
||||||
|
|
||||||
const contentLength = Number.parseInt(contentLengthHeader)
|
const contentLength = Number.parseInt(contentLengthHeader)
|
||||||
const lastModified = lastModifiedHeader !== null ? new Date(lastModifiedHeader).getTime() : Date.now()
|
const lastModified = lastModifiedHeader !== null ? new Date(lastModifiedHeader).getTime() : Date.now()
|
||||||
|
|
||||||
const result = await saveBlob(env, sql, object.body, contentLength, contentType, workspace, name, lastModified)
|
const result = await saveBlob(env, sql, object.body, contentLength, contentType, workspace, name, lastModified)
|
||||||
return json(result)
|
return json(result)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
@ -96,7 +96,7 @@ export async function handleSignComplete (request: BlobRequest, env: Env, ctx: E
|
|||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await handleBlobUploaded(env, workspace, name, uuid)
|
await handleBlobUploaded(env, ctx, workspace, name, uuid)
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
const message = err instanceof Error ? err.message : String(err)
|
const message = err instanceof Error ? err.message : String(err)
|
||||||
console.error({ error: message, workspace, name, uuid })
|
console.error({ error: message, workspace, name, uuid })
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
#:schema node_modules/wrangler/config-schema.json
|
#:schema node_modules/wrangler/config-schema.json
|
||||||
name = "datalake-worker"
|
name = "datalake-worker"
|
||||||
main = "src/index.ts"
|
main = "src/index.ts"
|
||||||
compatibility_date = "2024-07-01"
|
compatibility_date = "2024-09-23"
|
||||||
compatibility_flags = ["nodejs_compat"]
|
compatibility_flags = ["nodejs_compat"]
|
||||||
keep_vars = true
|
keep_vars = true
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user