UBERF-8546 Refactor datalake routing (#7051)

Signed-off-by: Alexander Onnikov <Alexander.Onnikov@xored.com>
This commit is contained in:
Alexander Onnikov 2024-10-28 12:59:24 +07:00 committed by GitHub
parent a2f9eb4990
commit cf6ff9293d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 123 additions and 102 deletions

View File

@ -18,7 +18,7 @@ import postgres from 'postgres'
import * as db from './db' import * as db from './db'
import { toUUID } from './encodings' import { toUUID } from './encodings'
import { selectStorage } from './storage' import { selectStorage } from './storage'
import { type UUID } from './types' import { type BlobRequest, type WorkspaceRequest, type UUID } from './types'
import { copyVideo, deleteVideo } from './video' import { copyVideo, deleteVideo } from './video'
const expires = 86400 const expires = 86400
@ -39,13 +39,9 @@ export function getBlobURL (request: Request, workspace: string, name: string):
return new URL(path, request.url).toString() return new URL(path, request.url).toString()
} }
export async function handleBlobGet ( export async function handleBlobGet (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise<Response> {
request: Request, const { workspace, name } = request
env: Env,
ctx: ExecutionContext,
workspace: string,
name: string
): Promise<Response> {
const sql = postgres(env.HYPERDRIVE.connectionString) const sql = postgres(env.HYPERDRIVE.connectionString)
const { bucket } = selectStorage(env, workspace) const { bucket } = selectStorage(env, workspace)
@ -82,13 +78,9 @@ export async function handleBlobGet (
return response return response
} }
export async function handleBlobHead ( export async function handleBlobHead (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise<Response> {
request: Request, const { workspace, name } = request
env: Env,
ctx: ExecutionContext,
workspace: string,
name: string
): Promise<Response> {
const sql = postgres(env.HYPERDRIVE.connectionString) const sql = postgres(env.HYPERDRIVE.connectionString)
const { bucket } = selectStorage(env, workspace) const { bucket } = selectStorage(env, workspace)
@ -106,7 +98,9 @@ export async function handleBlobHead (
return new Response(null, { headers, status: 200 }) return new Response(null, { headers, status: 200 })
} }
export async function deleteBlob (env: Env, workspace: string, name: string): Promise<Response> { export async function handleBlobDelete (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise<Response> {
const { workspace, name } = request
const sql = postgres(env.HYPERDRIVE.connectionString) const sql = postgres(env.HYPERDRIVE.connectionString)
try { try {
@ -120,13 +114,15 @@ export async function deleteBlob (env: Env, workspace: string, name: string): Pr
} }
} }
export async function postBlobFormData (request: Request, env: Env, workspace: string): Promise<Response> { export async function handleUploadFormData (request: WorkspaceRequest, env: Env): Promise<Response> {
const contentType = request.headers.get('Content-Type') const contentType = request.headers.get('Content-Type')
if (contentType === null || !contentType.includes('multipart/form-data')) { if (contentType === null || !contentType.includes('multipart/form-data')) {
console.error({ error: 'expected multipart/form-data' }) console.error({ error: 'expected multipart/form-data' })
return error(400, 'expected multipart/form-data') return error(400, 'expected multipart/form-data')
} }
const { workspace } = request
const sql = postgres(env.HYPERDRIVE.connectionString) const sql = postgres(env.HYPERDRIVE.connectionString)
let formData: FormData let formData: FormData

View File

@ -14,15 +14,17 @@
// //
import { getBlobURL } from './blob' import { getBlobURL } from './blob'
import { type BlobRequest } from './types'
const prefferedImageFormats = ['webp', 'avif', 'jpeg', 'png'] const prefferedImageFormats = ['webp', 'avif', 'jpeg', 'png']
export async function getImage ( export async function handleImageGet (request: BlobRequest): Promise<Response> {
request: Request, const {
workspace: string, workspace,
name: string, name,
transform: string params: { transform }
): Promise<Response> { } = request
const Accept = request.headers.get('Accept') ?? 'image/*' const Accept = request.headers.get('Accept') ?? 'image/*'
const image: Record<string, string> = {} const image: Record<string, string> = {}

View File

@ -13,53 +13,53 @@
// limitations under the License. // limitations under the License.
// //
import { type IRequest, Router, error, html } from 'itty-router' import { WorkerEntrypoint } from 'cloudflare:workers'
import { import { type IRequestStrict, type RequestHandler, Router, error, html } from 'itty-router'
deleteBlob as handleBlobDelete,
handleBlobGet, import { handleBlobDelete, handleBlobGet, handleBlobHead, handleUploadFormData } from './blob'
handleBlobHead,
postBlobFormData as handleUploadFormData
} from './blob'
import { cors } from './cors' import { cors } from './cors'
import { getImage as handleImageGet } from './image' import { handleImageGet } from './image'
import { getVideoMeta as handleVideoMetaGet } from './video' import { handleVideoMetaGet } from './video'
import { handleSignAbort, handleSignComplete, handleSignCreate } from './sign' import { handleSignAbort, handleSignComplete, handleSignCreate } from './sign'
import { type BlobRequest, type WorkspaceRequest } from './types'
const { preflight, corsify } = cors({ const { preflight, corsify } = cors({
maxAge: 86400 maxAge: 86400
}) })
export default { const router = Router<IRequestStrict, [Env, ExecutionContext], Response>({
async fetch (request, env, ctx): Promise<Response> {
const router = Router<IRequest>({
before: [preflight], before: [preflight],
finally: [corsify] finally: [corsify]
}) })
const withWorkspace: RequestHandler<WorkspaceRequest> = (request: WorkspaceRequest) => {
if (request.params.workspace === undefined || request.params.workspace === '') {
return error(400, 'Missing workspace')
}
request.workspace = decodeURIComponent(request.params.workspace)
}
const withBlob: RequestHandler<BlobRequest> = (request: BlobRequest) => {
if (request.params.name === undefined || request.params.name === '') {
return error(400, 'Missing blob name')
}
request.workspace = decodeURIComponent(request.params.name)
}
router router
.get('/blob/:workspace/:name', ({ params }) => handleBlobGet(request, env, ctx, params.workspace, params.name)) .get('/blob/:workspace/:name', withBlob, handleBlobGet)
.head('/blob/:workspace/:name', ({ params }) => handleBlobHead(request, env, ctx, params.workspace, params.name)) .head('/blob/:workspace/:name', withBlob, handleBlobHead)
.delete('/blob/:workspace/:name', ({ params }) => handleBlobDelete(env, params.workspace, params.name)) .delete('/blob/:workspace/:name', withBlob, handleBlobDelete)
// Image // Image
.get('/image/:transform/:workspace/:name', ({ params }) => .get('/image/:transform/:workspace/:name', withBlob, handleImageGet)
handleImageGet(request, params.workspace, params.name, params.transform)
)
// Video // Video
.get('/video/:workspace/:name/meta', ({ params }) => .get('/video/:workspace/:name/meta', withBlob, handleVideoMetaGet)
handleVideoMetaGet(request, env, ctx, params.workspace, params.name)
)
// Form Data // Form Data
.post('/upload/form-data/:workspace', ({ params }) => handleUploadFormData(request, env, params.workspace)) .post('/upload/form-data/:workspace', withWorkspace, handleUploadFormData)
// Signed URL // Signed URL
.post('/upload/signed-url/:workspace/:name', ({ params }) => .post('/upload/signed-url/:workspace/:name', withBlob, handleSignCreate)
handleSignCreate(request, env, ctx, params.workspace, params.name) .put('/upload/signed-url/:workspace/:name', withBlob, handleSignComplete)
) .delete('/upload/signed-url/:workspace/:name', withBlob, handleSignAbort)
.put('/upload/signed-url/:workspace/:name', ({ params }) =>
handleSignComplete(request, env, ctx, params.workspace, params.name)
)
.delete('/upload/signed-url/:workspace/:name', ({ params }) =>
handleSignAbort(request, env, ctx, params.workspace, params.name)
)
.all('/', () => .all('/', () =>
html( html(
`Huly&reg; Datalake&trade; <a href="https://huly.io">https://huly.io</a> `Huly&reg; Datalake&trade; <a href="https://huly.io">https://huly.io</a>
@ -68,6 +68,35 @@ export default {
) )
.all('*', () => error(404)) .all('*', () => error(404))
return await router.fetch(request).catch(error) export default class DatalakeWorker extends WorkerEntrypoint<Env> {
async fetch (request: Request): Promise<Response> {
return await router.fetch(request, this.env, this.ctx).catch(error)
}
async getBlob (workspace: string, name: string): Promise<ArrayBuffer> {
const request = new Request(`https://datalake/blob/${workspace}/${name}`)
const response = await router.fetch(request)
if (!response.ok) {
console.error({ error: 'datalake error: ' + response.statusText, workspace, name })
throw new Error(`Failed to fetch blob: ${response.statusText}`)
}
return await response.arrayBuffer()
}
async putBlob (workspace: string, name: string, data: ArrayBuffer | Blob | string, type: string): Promise<void> {
const request = new Request(`https://datalake/upload/form-data/${workspace}`)
const body = new FormData()
const blob = new Blob([data], { type })
body.set('file', blob, name)
const response = await router.fetch(request, { method: 'POST', body })
if (!response.ok) {
console.error({ error: 'datalake error: ' + response.statusText, workspace, name })
throw new Error(`Failed to fetch blob: ${response.statusText}`)
}
}
} }
} satisfies ExportedHandler<Env>

View File

@ -17,7 +17,7 @@ import { AwsClient } from 'aws4fetch'
import { error } from 'itty-router' import { error } from 'itty-router'
import { handleBlobUploaded } from './blob' import { handleBlobUploaded } from './blob'
import { type UUID } from './types' import { type BlobRequest, type UUID } from './types'
import { selectStorage, type Storage } from './storage' import { selectStorage, type Storage } from './storage'
const S3_SIGNED_LINK_TTL = 3600 const S3_SIGNED_LINK_TTL = 3600
@ -39,13 +39,8 @@ function getS3Client (storage: Storage): AwsClient {
}) })
} }
export async function handleSignCreate ( export async function handleSignCreate (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise<Response> {
request: Request, const { workspace, name } = request
env: Env,
ctx: ExecutionContext,
workspace: string,
name: string
): Promise<Response> {
const storage = selectStorage(env, workspace) const storage = selectStorage(env, workspace)
const accountId = env.R2_ACCOUNT_ID const accountId = env.R2_ACCOUNT_ID
@ -78,13 +73,9 @@ export async function handleSignCreate (
return new Response(signed.url, { status: 200, headers }) return new Response(signed.url, { status: 200, headers })
} }
export async function handleSignComplete ( export async function handleSignComplete (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise<Response> {
request: Request, const { workspace, name } = request
env: Env,
ctx: ExecutionContext,
workspace: string,
name: string
): Promise<Response> {
const { bucket } = selectStorage(env, workspace) const { bucket } = selectStorage(env, workspace)
const key = signBlobKey(workspace, name) const key = signBlobKey(workspace, name)
@ -117,13 +108,9 @@ export async function handleSignComplete (
return new Response(null, { status: 201 }) return new Response(null, { status: 201 })
} }
export async function handleSignAbort ( export async function handleSignAbort (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise<Response> {
request: Request, const { workspace, name } = request
env: Env,
ctx: ExecutionContext,
workspace: string,
name: string
): Promise<Response> {
const key = signBlobKey(workspace, name) const key = signBlobKey(workspace, name)
// Check if the blob has been uploaded // Check if the blob has been uploaded

View File

@ -13,10 +13,21 @@
// limitations under the License. // limitations under the License.
// //
import { type IRequestStrict } from 'itty-router'
export type Location = 'weur' | 'eeur' | 'wnam' | 'enam' | 'apac' export type Location = 'weur' | 'eeur' | 'wnam' | 'enam' | 'apac'
export type UUID = string & { __uuid: true } export type UUID = string & { __uuid: true }
export type WorkspaceRequest = {
workspace: string
} & IRequestStrict
export type BlobRequest = {
workspace: string
name: string
} & IRequestStrict
export interface CloudflareResponse { export interface CloudflareResponse {
success: boolean success: boolean
errors: any errors: any

View File

@ -15,7 +15,7 @@
import { error, json } from 'itty-router' import { error, json } from 'itty-router'
import { type CloudflareResponse, type StreamUploadResponse } from './types' import { type BlobRequest, type CloudflareResponse, type StreamUploadResponse } from './types'
export type StreamUploadState = 'ready' | 'error' | 'inprogress' | 'queued' | 'downloading' | 'pendingupload' export type StreamUploadState = 'ready' | 'error' | 'inprogress' | 'queued' | 'downloading' | 'pendingupload'
@ -42,13 +42,9 @@ function streamBlobKey (workspace: string, name: string): string {
return `v/${workspace}/${name}` return `v/${workspace}/${name}`
} }
export async function getVideoMeta ( export async function handleVideoMetaGet (request: BlobRequest, env: Env, ctx: ExecutionContext): Promise<Response> {
request: Request, const { workspace, name } = request
env: Env,
ctx: ExecutionContext,
workspace: string,
name: string
): Promise<Response> {
const key = streamBlobKey(workspace, name) const key = streamBlobKey(workspace, name)
const streamInfo = await env.datalake_blobs.get<StreamBlobInfo>(key, { type: 'json' }) const streamInfo = await env.datalake_blobs.get<StreamBlobInfo>(key, { type: 'json' })