UBERF-8329 Add missing methods to datalake adapter (#6784)

Signed-off-by: Alexander Onnikov <Alexander.Onnikov@xored.com>
This commit is contained in:
Alexander Onnikov 2024-10-02 17:12:21 +07:00 committed by GitHub
parent 99749f30fb
commit c9e41e92a2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 133 additions and 11 deletions

View File

@ -26,6 +26,14 @@ export interface ObjectMetadata {
size?: number size?: number
} }
/** @public */
export interface StatObjectOutput {
lastModified: number
type: string
etag?: string
size?: number
}
/** @public */ /** @public */
export interface PutObjectOutput { export interface PutObjectOutput {
id: string id: string
@ -55,9 +63,19 @@ export class Client {
async getObject (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise<Readable> { async getObject (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise<Readable> {
const url = this.getObjectUrl(ctx, workspace, objectName) const url = this.getObjectUrl(ctx, workspace, objectName)
const response = await fetch(url)
let response
try {
response = await fetch(url)
} catch (err: any) {
ctx.error('network error', { error: err })
throw new Error(`Network error ${err}`)
}
if (!response.ok) { if (!response.ok) {
if (response.status === 404) {
throw new Error('Not Found')
}
throw new Error('HTTP error ' + response.status) throw new Error('HTTP error ' + response.status)
} }
@ -69,12 +87,90 @@ export class Client {
return Readable.from(response.body) return Readable.from(response.body)
} }
async getPartialObject (
ctx: MeasureContext,
workspace: WorkspaceId,
objectName: string,
offset: number,
length?: number
): Promise<Readable> {
const url = this.getObjectUrl(ctx, workspace, objectName)
const headers = {
Range: `bytes=${offset}-${length ?? ''}`
}
let response
try {
response = await fetch(url, { headers })
} catch (err: any) {
ctx.error('network error', { error: err })
throw new Error(`Network error ${err}`)
}
if (!response.ok) {
if (response.status === 404) {
throw new Error('Not Found')
}
throw new Error('HTTP error ' + response.status)
}
if (response.body == null) {
ctx.error('bad datalake response', { objectName })
throw new Error('Missing response body')
}
return Readable.from(response.body)
}
async statObject (
ctx: MeasureContext,
workspace: WorkspaceId,
objectName: string
): Promise<StatObjectOutput | undefined> {
const url = this.getObjectUrl(ctx, workspace, objectName)
let response
try {
response = await fetch(url, { method: 'HEAD' })
} catch (err: any) {
ctx.error('network error', { error: err })
throw new Error(`Network error ${err}`)
}
if (!response.ok) {
if (response.status === 404) {
return undefined
}
throw new Error('HTTP error ' + response.status)
}
const headers = response.headers
const lastModified = Date.parse(headers.get('Last-Modified') ?? '')
const size = parseInt(headers.get('Content-Length') ?? '0', 10)
return {
lastModified: isNaN(lastModified) ? 0 : lastModified,
size: isNaN(size) ? 0 : size,
type: headers.get('Content-Type') ?? '',
etag: headers.get('ETag') ?? ''
}
}
async deleteObject (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise<void> { async deleteObject (ctx: MeasureContext, workspace: WorkspaceId, objectName: string): Promise<void> {
const url = this.getObjectUrl(ctx, workspace, objectName) const url = this.getObjectUrl(ctx, workspace, objectName)
const response = await fetch(url, { method: 'DELETE' }) let response
try {
response = await fetch(url, { method: 'DELETE' })
} catch (err: any) {
ctx.error('network error', { error: err })
throw new Error(`Network error ${err}`)
}
if (!response.ok) { if (!response.ok) {
if (response.status === 404) {
throw new Error('Not Found')
}
throw new Error('HTTP error ' + response.status) throw new Error('HTTP error ' + response.status)
} }
} }
@ -100,10 +196,13 @@ export class Client {
} }
form.append('file', stream, options) form.append('file', stream, options)
const response = await fetch(url, { let response
method: 'POST', try {
body: form response = await fetch(url, { method: 'POST', body: form })
}) } catch (err: any) {
ctx.error('network error', { error: err })
throw new Error(`Network error ${err}`)
}
if (!response.ok) { if (!response.ok) {
throw new Error('HTTP error ' + response.status) throw new Error('HTTP error ' + response.status)

View File

@ -13,7 +13,7 @@
// limitations under the License. // limitations under the License.
// //
import { withContext, type Blob, type MeasureContext, type WorkspaceId } from '@hcengineering/core' import core, { type Blob, type MeasureContext, type Ref, type WorkspaceId, withContext } from '@hcengineering/core'
import { import {
type BlobStorageIterator, type BlobStorageIterator,
@ -74,13 +74,36 @@ export class DatalakeService implements StorageAdapter {
@withContext('listStream') @withContext('listStream')
async listStream (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<BlobStorageIterator> { async listStream (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<BlobStorageIterator> {
throw new Error('not supported') return {
next: async () => [],
close: async () => {}
}
} }
@withContext('stat') @withContext('stat')
async stat (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string): Promise<Blob | undefined> { async stat (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string): Promise<Blob | undefined> {
// not supported try {
return undefined const result = await this.client.statObject(ctx, workspaceId, objectName)
if (result !== undefined) {
return {
provider: '',
_class: core.class.Blob,
_id: objectName as Ref<Blob>,
storageId: objectName,
contentType: result.type,
size: result.size ?? 0,
etag: result.etag ?? '',
space: core.space.Configuration,
modifiedBy: core.account.System,
modifiedOn: result.lastModified,
version: null
}
} else {
ctx.error('no object found', { objectName, workspaceId: workspaceId.name })
}
} catch (err) {
ctx.error('failed to stat object', { error: err, objectName, workspaceId: workspaceId.name })
}
} }
@withContext('get') @withContext('get')
@ -134,7 +157,7 @@ export class DatalakeService implements StorageAdapter {
offset: number, offset: number,
length?: number length?: number
): Promise<Readable> { ): Promise<Readable> {
throw new Error('not implemented') return await this.client.getPartialObject(ctx, workspaceId, objectName, offset, length)
} }
async getUrl (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string): Promise<string> { async getUrl (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string): Promise<string> {