UBERF-7422: Fix blob/stora (#5933)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-06-28 01:21:20 +07:00 committed by GitHub
parent 80d22b556c
commit 524c5b4ab8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 67 additions and 13 deletions

View File

@ -1,6 +1,7 @@
import core, { import core, {
DOMAIN_BLOB, DOMAIN_BLOB,
groupByArray, groupByArray,
toIdMap,
withContext, withContext,
type Blob, type Blob,
type BlobLookup, type BlobLookup,
@ -76,11 +77,64 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
async initialize (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<void> {} async initialize (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<void> {}
async doSyncDocs (ctx: MeasureContext, workspaceId: WorkspaceId, docs: ListBlobResult[]): Promise<void> {
const existingBlobs = toIdMap(
await this.dbAdapter.find<Blob>(ctx, workspaceId, DOMAIN_BLOB, { _id: { $in: docs.map((it) => it._id) } })
)
const toUpdate: Blob[] = []
for (const d of docs) {
const blobInfo = existingBlobs.get(d._id)
if (blobInfo === undefined || blobInfo.etag !== d.etag || blobInfo.size !== d.size) {
const stat = await this.stat(ctx, workspaceId, d._id)
if (stat !== undefined) {
toUpdate.push(stat)
}
}
}
if (toUpdate.length > 0) {
await this.dbAdapter.clean(ctx, workspaceId, DOMAIN_BLOB, Array.from(toUpdate.map((it) => it._id)))
await this.dbAdapter.upload(ctx, workspaceId, DOMAIN_BLOB, toUpdate)
}
}
find (ctx: MeasureContext, workspaceId: WorkspaceId): StorageIterator { find (ctx: MeasureContext, workspaceId: WorkspaceId): StorageIterator {
const storageIterator = this.makeStorageIterator(ctx, workspaceId)
let buffer: ListBlobResult[] = []
return {
next: async (ctx) => {
const docInfo = await storageIterator.next()
if (docInfo !== undefined) {
buffer.push(docInfo)
}
if (buffer.length > 50) {
await this.doSyncDocs(ctx, workspaceId, buffer)
buffer = []
}
if (docInfo !== undefined) {
return {
hash: docInfo.etag,
id: docInfo._id,
size: docInfo.size
}
}
},
close: async (ctx) => {
if (buffer.length > 0) {
await this.doSyncDocs(ctx, workspaceId, buffer)
}
await storageIterator.close()
}
}
}
private makeStorageIterator (ctx: MeasureContext, workspaceId: WorkspaceId): BlobStorageIterator {
const adapters = Array.from(this.adapters.values()) const adapters = Array.from(this.adapters.values())
let iterator: BlobStorageIterator | undefined let iterator: BlobStorageIterator | undefined
return { return {
next: async (ctx) => { next: async () => {
while (true) { while (true) {
if (iterator === undefined && adapters.length > 0) { if (iterator === undefined && adapters.length > 0) {
iterator = await (adapters.shift() as StorageAdapter).listStream(ctx, workspaceId) iterator = await (adapters.shift() as StorageAdapter).listStream(ctx, workspaceId)
@ -90,11 +144,8 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
} }
const docInfo = await iterator.next() const docInfo = await iterator.next()
if (docInfo !== undefined) { if (docInfo !== undefined) {
return { // We need to check if our stored version is fine
hash: docInfo.etag, return docInfo
id: docInfo._id,
size: docInfo.size
}
} else { } else {
// We need to take next adapter // We need to take next adapter
await iterator.close() await iterator.close()
@ -102,7 +153,7 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
} }
} }
}, },
close: async (ctx) => { close: async () => {
if (iterator !== undefined) { if (iterator !== undefined) {
await iterator.close() await iterator.close()
} }
@ -250,9 +301,12 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
selectProvider ( selectProvider (
forceProvider: string | undefined, forceProvider: string | undefined,
contentType: string contentType: string
): { adapter: StorageAdapter | undefined, provider: string } { ): { adapter: StorageAdapter, provider: string } {
if (forceProvider !== undefined) { if (forceProvider !== undefined) {
return { adapter: this.adapters.get(forceProvider), provider: forceProvider } return {
adapter: this.adapters.get(forceProvider) ?? (this.adapters.get(this.defaultAdapter) as StorageAdapter),
provider: forceProvider
}
} }
// try select provider based on content type matching. // try select provider based on content type matching.
for (const [provider, adapter] of this.adapters.entries()) { for (const [provider, adapter] of this.adapters.entries()) {
@ -265,7 +319,7 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
} }
} }
return { adapter: this.adapters.get(this.defaultAdapter), provider: this.defaultAdapter } return { adapter: this.adapters.get(this.defaultAdapter) as StorageAdapter, provider: this.defaultAdapter }
} }
@withContext('aggregator-put', {}) @withContext('aggregator-put', {})
@ -283,9 +337,6 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
).shift() ).shift()
const { provider, adapter } = this.selectProvider(stat?.provider, contentType) const { provider, adapter } = this.selectProvider(stat?.provider, contentType)
if (adapter === undefined) {
throw new NoSuchKeyError('No such provider found')
}
const result = await adapter.put(ctx, workspaceId, objectName, stream, contentType, size) const result = await adapter.put(ctx, workspaceId, objectName, stream, contentType, size)

View File

@ -65,6 +65,9 @@ class StorageBlobAdapter implements DbAdapter {
} }
find (ctx: MeasureContext, domain: Domain, recheck?: boolean): StorageIterator { find (ctx: MeasureContext, domain: Domain, recheck?: boolean): StorageIterator {
if (recheck === true) {
return (this.client as StorageAdapterEx).find(ctx, this.workspaceId)
}
return this.blobAdapter.find(ctx, domain, recheck) return this.blobAdapter.find(ctx, domain, recheck)
} }