mirror of
https://github.com/hcengineering/platform.git
synced 2024-11-22 03:14:40 +03:00
UBERF-8578: Fix extra stat call for storage adapter (#7132)
Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
parent
a05e2a31d9
commit
04c8c2ffa0
@ -28,17 +28,20 @@ export async function yDocFromStorage (
|
|||||||
ydoc?: YDoc
|
ydoc?: YDoc
|
||||||
): Promise<YDoc | undefined> {
|
): Promise<YDoc | undefined> {
|
||||||
// stat the object to ensure it exists, because read will throw an error in this case
|
// stat the object to ensure it exists, because read will throw an error in this case
|
||||||
const blob = await storageAdapter.stat(ctx, workspace, documentId)
|
try {
|
||||||
if (blob === undefined) {
|
const buffer = await storageAdapter.read(ctx, workspace, documentId)
|
||||||
return undefined
|
|
||||||
|
// no need to apply gc because we load existing document
|
||||||
|
// it is either already gc-ed, or gc not needed and it is disabled
|
||||||
|
ydoc ??= new YDoc({ guid: generateId(), gc: false })
|
||||||
|
|
||||||
|
return yDocFromBuffer(Buffer.concat(buffer as any), ydoc)
|
||||||
|
} catch (err: any) {
|
||||||
|
if (err.code === 'NoSuchKey') {
|
||||||
|
return undefined
|
||||||
|
}
|
||||||
|
throw err
|
||||||
}
|
}
|
||||||
|
|
||||||
// no need to apply gc because we load existing document
|
|
||||||
// it is either already gc-ed, or gc not needed and it is disabled
|
|
||||||
ydoc ??= new YDoc({ guid: generateId(), gc: false })
|
|
||||||
|
|
||||||
const buffer = await storageAdapter.read(ctx, workspace, documentId)
|
|
||||||
return yDocFromBuffer(Buffer.concat(buffer as any), ydoc)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @public */
|
/** @public */
|
||||||
|
@ -48,8 +48,8 @@ import type { Request, Response } from '@hcengineering/rpc'
|
|||||||
import type { Token } from '@hcengineering/server-token'
|
import type { Token } from '@hcengineering/server-token'
|
||||||
import { type Readable } from 'stream'
|
import { type Readable } from 'stream'
|
||||||
import type { DbAdapter, DomainHelper } from './adapter'
|
import type { DbAdapter, DomainHelper } from './adapter'
|
||||||
import { type StorageAdapter } from './storage'
|
|
||||||
import type { StatisticsElement } from './stats'
|
import type { StatisticsElement } from './stats'
|
||||||
|
import { type StorageAdapter } from './storage'
|
||||||
|
|
||||||
export interface ServerFindOptions<T extends Doc> extends FindOptions<T> {
|
export interface ServerFindOptions<T extends Doc> extends FindOptions<T> {
|
||||||
domain?: Domain // Allow to find for Doc's in specified domain only.
|
domain?: Domain // Allow to find for Doc's in specified domain only.
|
||||||
@ -519,6 +519,17 @@ export interface StorageConfig {
|
|||||||
port?: number
|
port?: number
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export class NoSuchKeyError extends Error {
|
||||||
|
code: string
|
||||||
|
constructor (
|
||||||
|
msg: string,
|
||||||
|
readonly cause?: any
|
||||||
|
) {
|
||||||
|
super(msg)
|
||||||
|
this.code = 'NoSuchKey'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export interface StorageConfiguration {
|
export interface StorageConfiguration {
|
||||||
default: string
|
default: string
|
||||||
storages: StorageConfig[]
|
storages: StorageConfig[]
|
||||||
|
@ -401,7 +401,7 @@ export function start (
|
|||||||
}
|
}
|
||||||
|
|
||||||
let blobInfo = await ctx.with(
|
let blobInfo = await ctx.with(
|
||||||
'notoken-stat',
|
'stat',
|
||||||
{ workspace: payload.workspace.name },
|
{ workspace: payload.workspace.name },
|
||||||
async (ctx) => await config.storageAdapter.stat(ctx, payload.workspace, uuid)
|
async (ctx) => await config.storageAdapter.stat(ctx, payload.workspace, uuid)
|
||||||
)
|
)
|
||||||
|
@ -29,6 +29,7 @@ import core, {
|
|||||||
} from '@hcengineering/core'
|
} from '@hcengineering/core'
|
||||||
import { getMetadata } from '@hcengineering/platform'
|
import { getMetadata } from '@hcengineering/platform'
|
||||||
import serverCore, {
|
import serverCore, {
|
||||||
|
NoSuchKeyError,
|
||||||
type BlobStorageIterator,
|
type BlobStorageIterator,
|
||||||
type ListBlobResult,
|
type ListBlobResult,
|
||||||
type StorageAdapter,
|
type StorageAdapter,
|
||||||
@ -316,21 +317,26 @@ export class S3Service implements StorageAdapter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async doGet (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string, range?: string): Promise<Readable> {
|
async doGet (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string, range?: string): Promise<Readable> {
|
||||||
const res = await this.client.getObject({
|
try {
|
||||||
Bucket: this.getBucketId(workspaceId),
|
const res = await this.client.getObject({
|
||||||
Key: this.getDocumentKey(workspaceId, objectName),
|
Bucket: this.getBucketId(workspaceId),
|
||||||
Range: range
|
Key: this.getDocumentKey(workspaceId, objectName),
|
||||||
})
|
Range: range
|
||||||
|
})
|
||||||
|
|
||||||
const stream = res.Body?.transformToWebStream()
|
const stream = res.Body?.transformToWebStream()
|
||||||
|
|
||||||
if (stream !== undefined) {
|
if (stream !== undefined) {
|
||||||
return Readable.fromWeb(stream as ReadableStream<any>)
|
return Readable.fromWeb(stream as ReadableStream<any>)
|
||||||
} else {
|
} else {
|
||||||
const readable = new Readable()
|
const readable = new Readable()
|
||||||
readable._read = () => {}
|
readable._read = () => {}
|
||||||
readable.push(null)
|
readable.push(null)
|
||||||
return readable
|
return readable
|
||||||
|
}
|
||||||
|
} catch (err: any) {
|
||||||
|
// In case of error return undefined
|
||||||
|
throw new NoSuchKeyError(`${workspaceId.name} missing ${objectName}`, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -144,7 +144,7 @@ export class FallbackStorageAdapter implements StorageAdapter, StorageAdapterEx
|
|||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
@withContext('aggregator-delete', {})
|
@withContext('fallback-delete', {})
|
||||||
async delete (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<void> {
|
async delete (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<void> {
|
||||||
for (const { adapter } of this.adapters) {
|
for (const { adapter } of this.adapters) {
|
||||||
if (await adapter.exists(ctx, workspaceId)) {
|
if (await adapter.exists(ctx, workspaceId)) {
|
||||||
@ -153,7 +153,7 @@ export class FallbackStorageAdapter implements StorageAdapter, StorageAdapterEx
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@withContext('aggregator-remove', {})
|
@withContext('fallback-remove', {})
|
||||||
async remove (ctx: MeasureContext, workspaceId: WorkspaceId, objectNames: string[]): Promise<void> {
|
async remove (ctx: MeasureContext, workspaceId: WorkspaceId, objectNames: string[]): Promise<void> {
|
||||||
// Group by provider and delegate into it.
|
// Group by provider and delegate into it.
|
||||||
for (const { adapter } of this.adapters) {
|
for (const { adapter } of this.adapters) {
|
||||||
@ -173,40 +173,30 @@ export class FallbackStorageAdapter implements StorageAdapter, StorageAdapterEx
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@withContext('aggregator-stat', {})
|
@withContext('fallback-stat', {})
|
||||||
async stat (ctx: MeasureContext, workspaceId: WorkspaceId, name: string): Promise<Blob | undefined> {
|
async stat (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string): Promise<Blob | undefined> {
|
||||||
const result = await this.findProvider(ctx, workspaceId, name)
|
|
||||||
if (result !== undefined) {
|
|
||||||
result.stat.provider = result.name
|
|
||||||
}
|
|
||||||
return result?.stat
|
|
||||||
}
|
|
||||||
|
|
||||||
@withContext('aggregator-get', {})
|
|
||||||
async get (ctx: MeasureContext, workspaceId: WorkspaceId, name: string): Promise<Readable> {
|
|
||||||
const result = await this.findProvider(ctx, workspaceId, name)
|
|
||||||
if (result === undefined) {
|
|
||||||
throw new NoSuchKeyError(`${workspaceId.name} missing ${name}`)
|
|
||||||
}
|
|
||||||
return await result.adapter.get(ctx, workspaceId, result.stat._id)
|
|
||||||
}
|
|
||||||
|
|
||||||
@withContext('find-provider', {})
|
|
||||||
private async findProvider (
|
|
||||||
ctx: MeasureContext,
|
|
||||||
workspaceId: WorkspaceId,
|
|
||||||
objectName: string
|
|
||||||
): Promise<{ name: string, adapter: StorageAdapter, stat: Blob } | undefined> {
|
|
||||||
// Group by provider and delegate into it.
|
|
||||||
for (const { name, adapter } of this.adapters) {
|
for (const { name, adapter } of this.adapters) {
|
||||||
const stat = await adapter.stat(ctx, workspaceId, objectName)
|
const stat = await adapter.stat(ctx, workspaceId, objectName)
|
||||||
if (stat !== undefined) {
|
if (stat !== undefined) {
|
||||||
return { name, adapter, stat }
|
stat.provider = name
|
||||||
|
return stat
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@withContext('aggregator-partial', {})
|
@withContext('fallback-get', {})
|
||||||
|
async get (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string): Promise<Readable> {
|
||||||
|
for (const { adapter } of this.adapters) {
|
||||||
|
try {
|
||||||
|
return await adapter.get(ctx, workspaceId, objectName)
|
||||||
|
} catch (err: any) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new NoSuchKeyError(`${workspaceId.name} missing ${objectName}`)
|
||||||
|
}
|
||||||
|
|
||||||
|
@withContext('fallback-partial', {})
|
||||||
async partial (
|
async partial (
|
||||||
ctx: MeasureContext,
|
ctx: MeasureContext,
|
||||||
workspaceId: WorkspaceId,
|
workspaceId: WorkspaceId,
|
||||||
@ -214,20 +204,26 @@ export class FallbackStorageAdapter implements StorageAdapter, StorageAdapterEx
|
|||||||
offset: number,
|
offset: number,
|
||||||
length?: number | undefined
|
length?: number | undefined
|
||||||
): Promise<Readable> {
|
): Promise<Readable> {
|
||||||
const result = await this.findProvider(ctx, workspaceId, objectName)
|
for (const { adapter } of this.adapters) {
|
||||||
if (result === undefined) {
|
try {
|
||||||
throw new NoSuchKeyError(`${workspaceId.name} missing ${objectName}`)
|
return await adapter.partial(ctx, workspaceId, objectName, offset, length)
|
||||||
|
} catch (err: any) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return await result.adapter.partial(ctx, workspaceId, result.stat._id, offset, length)
|
throw new NoSuchKeyError(`${workspaceId.name} missing ${objectName}`)
|
||||||
}
|
}
|
||||||
|
|
||||||
@withContext('aggregator-read', {})
|
@withContext('fallback-read', {})
|
||||||
async read (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string): Promise<Buffer[]> {
|
async read (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string): Promise<Buffer[]> {
|
||||||
const result = await this.findProvider(ctx, workspaceId, objectName)
|
for (const { adapter } of this.adapters) {
|
||||||
if (result === undefined) {
|
try {
|
||||||
throw new NoSuchKeyError(`${workspaceId.name} missing ${objectName}`)
|
return await adapter.read(ctx, workspaceId, objectName)
|
||||||
|
} catch (err: any) {
|
||||||
|
// Ignore
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return await result.adapter.read(ctx, workspaceId, result.stat._id)
|
throw new NoSuchKeyError(`${workspaceId.name} missing ${objectName}`)
|
||||||
}
|
}
|
||||||
|
|
||||||
@withContext('aggregator-put', {})
|
@withContext('aggregator-put', {})
|
||||||
|
Loading…
Reference in New Issue
Block a user