fix: datalake issues (#7217)

Signed-off-by: Alexander Onnikov <Alexander.Onnikov@xored.com>
This commit is contained in:
Alexander Onnikov 2024-11-22 01:19:33 +07:00 committed by GitHub
parent 2d102207ee
commit 500a221ceb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 79 additions and 25 deletions

View File

@ -18,6 +18,8 @@ import FormData from 'form-data'
import fetch, { type RequestInit, type Response } from 'node-fetch'
import { Readable } from 'stream'
import { DatalakeError, NetworkError, NotFoundError } from './error'
/** @public */
export interface ObjectMetadata {
lastModified: number
@ -62,14 +64,16 @@ export class Client {
let response
try {
response = await fetchSafe(ctx, url)
} catch (err) {
console.error('failed to get object', { workspace, objectName, err })
} catch (err: any) {
if (err.name !== 'NotFoundError') {
console.error('failed to get object', { workspace, objectName, err })
}
throw err
}
if (response.body == null) {
ctx.error('bad datalake response', { objectName })
throw new Error('Missing response body')
throw new DatalakeError('Missing response body')
}
return Readable.from(response.body)
@ -90,14 +94,16 @@ export class Client {
let response
try {
response = await fetchSafe(ctx, url, { headers })
} catch (err) {
console.error('failed to get partial object', { workspace, objectName, err })
} catch (err: any) {
if (err.name !== 'NotFoundError') {
console.error('failed to get partial object', { workspace, objectName, err })
}
throw err
}
if (response.body == null) {
ctx.error('bad datalake response', { objectName })
throw new Error('Missing response body')
throw new DatalakeError('Missing response body')
}
return Readable.from(response.body)
@ -113,7 +119,10 @@ export class Client {
let response: Response
try {
response = await fetchSafe(ctx, url, { method: 'HEAD' })
} catch (err) {
} catch (err: any) {
if (err.name === 'NotFoundError') {
return
}
console.error('failed to stat object', { workspace, objectName, err })
throw err
}
@ -134,9 +143,11 @@ export class Client {
const url = this.getObjectUrl(ctx, workspace, objectName)
try {
await fetchSafe(ctx, url, { method: 'DELETE' })
} catch (err) {
console.error('failed to delete object', { workspace, objectName, err })
throw err
} catch (err: any) {
if (err.name !== 'NotFoundError') {
console.error('failed to delete object', { workspace, objectName, err })
throw err
}
}
}
@ -187,7 +198,7 @@ export class Client {
const form = new FormData()
const options: FormData.AppendOptions = {
filename: encodeURIComponent(objectName),
filename: objectName,
contentType: metadata.type,
knownLength: metadata.size,
header: {
@ -200,13 +211,13 @@ export class Client {
const result = (await response.json()) as BlobUploadResult[]
if (result.length !== 1) {
throw new Error('Bad datalake response: ' + result.toString())
throw new DatalakeError('Bad datalake response: ' + result.toString())
}
const uploadResult = result[0]
if ('error' in uploadResult) {
throw new Error('Upload failed: ' + uploadResult.error)
throw new DatalakeError('Upload failed: ' + uploadResult.error)
}
}
@ -232,7 +243,7 @@ export class Client {
} catch (err) {
ctx.error('failed to upload via signed url', { workspace, objectName, err })
await this.signObjectDelete(ctx, workspace, objectName)
throw new Error('Failed to upload via signed URL')
throw new DatalakeError('Failed to upload via signed URL')
}
await this.signObjectComplete(ctx, workspace, objectName)
@ -245,7 +256,7 @@ export class Client {
return await response.text()
} catch (err: any) {
ctx.error('failed to sign object', { workspace, objectName, err })
throw new Error('Failed to sign URL')
throw new DatalakeError('Failed to sign URL')
}
}
@ -255,7 +266,7 @@ export class Client {
await fetchSafe(ctx, url, { method: 'PUT' })
} catch (err: any) {
ctx.error('failed to complete signed url upload', { workspace, objectName, err })
throw new Error('Failed to complete signed URL upload')
throw new DatalakeError('Failed to complete signed URL upload')
}
}
@ -265,7 +276,7 @@ export class Client {
await fetchSafe(ctx, url, { method: 'DELETE' })
} catch (err: any) {
ctx.error('failed to abort signed url upload', { workspace, objectName, err })
throw new Error('Failed to abort signed URL upload')
throw new DatalakeError('Failed to abort signed URL upload')
}
}
@ -281,12 +292,16 @@ async function fetchSafe (ctx: MeasureContext, url: string, init?: RequestInit):
response = await fetch(url, init)
} catch (err: any) {
ctx.error('network error', { err })
throw new Error(`Network error ${err}`)
throw new NetworkError(`Network error ${err}`)
}
if (!response.ok) {
const text = await response.text()
throw new Error(response.status === 404 ? 'Not Found' : 'HTTP error ' + response.status + ': ' + text)
if (response.status === 404) {
throw new NotFoundError(text)
} else {
throw new DatalakeError(text)
}
}
return response

View File

@ -0,0 +1,35 @@
//
// Copyright © 2024 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
export class NetworkError extends Error {
constructor (message: string) {
super(message)
this.name = 'NetworkError'
}
}
export class DatalakeError extends Error {
constructor (message: string) {
super(message)
this.name = 'DatalakeError'
}
}
export class NotFoundError extends DatalakeError {
constructor (message = 'Not Found') {
super(message)
this.name = 'NotFoundError'
}
}

View File

@ -35,9 +35,11 @@ export interface DatalakeConfig extends StorageConfig {
*/
export class DatalakeService implements StorageAdapter {
static config = 'datalake'
client: Client
private readonly client: Client
constructor (readonly opt: DatalakeConfig) {
this.client = new Client(opt.endpoint)
const endpoint = Number.isInteger(opt.port) ? `${opt.endpoint}:${opt.port}` : opt.endpoint
this.client = new Client(endpoint)
}
async initialize (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<void> {}
@ -97,8 +99,6 @@ export class DatalakeService implements StorageAdapter {
modifiedOn: result.lastModified,
version: null
}
} else {
ctx.error('no object found', { objectName, workspaceId: workspaceId.name })
}
} catch (err) {
ctx.error('failed to stat object', { error: err, objectName, workspaceId: workspaceId.name })

View File

@ -85,7 +85,7 @@ export async function handleBlobHead (request: BlobRequest, env: Env, ctx: Execu
const { bucket } = selectStorage(env, workspace)
const blob = await db.getBlob(sql, { workspace, name })
if (blob === null) {
if (blob === null || blob.deleted) {
return error(404)
}

View File

@ -40,10 +40,14 @@ const withWorkspace: RequestHandler<WorkspaceRequest> = (request: WorkspaceReque
}
const withBlob: RequestHandler<BlobRequest> = (request: BlobRequest) => {
if (request.params.workspace === undefined || request.params.workspace === '') {
return error(400, 'Missing workspace')
}
if (request.params.name === undefined || request.params.name === '') {
return error(400, 'Missing blob name')
}
request.workspace = decodeURIComponent(request.params.name)
request.workspace = decodeURIComponent(request.params.workspace)
request.name = decodeURIComponent(request.params.name)
}
router