diff --git a/server/core/src/server/aggregator.ts b/server/core/src/server/aggregator.ts index 4cab44dee3..90cb053a2b 100644 --- a/server/core/src/server/aggregator.ts +++ b/server/core/src/server/aggregator.ts @@ -1,6 +1,7 @@ import core, { DOMAIN_BLOB, groupByArray, + toIdMap, withContext, type Blob, type BlobLookup, @@ -76,11 +77,64 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE async initialize (ctx: MeasureContext, workspaceId: WorkspaceId): Promise {} + async doSyncDocs (ctx: MeasureContext, workspaceId: WorkspaceId, docs: ListBlobResult[]): Promise { + const existingBlobs = toIdMap( + await this.dbAdapter.find(ctx, workspaceId, DOMAIN_BLOB, { _id: { $in: docs.map((it) => it._id) } }) + ) + const toUpdate: Blob[] = [] + for (const d of docs) { + const blobInfo = existingBlobs.get(d._id) + if (blobInfo === undefined || blobInfo.etag !== d.etag || blobInfo.size !== d.size) { + const stat = await this.stat(ctx, workspaceId, d._id) + if (stat !== undefined) { + toUpdate.push(stat) + } + } + } + if (toUpdate.length > 0) { + await this.dbAdapter.clean(ctx, workspaceId, DOMAIN_BLOB, Array.from(toUpdate.map((it) => it._id))) + await this.dbAdapter.upload(ctx, workspaceId, DOMAIN_BLOB, toUpdate) + } + } + find (ctx: MeasureContext, workspaceId: WorkspaceId): StorageIterator { + const storageIterator = this.makeStorageIterator(ctx, workspaceId) + + let buffer: ListBlobResult[] = [] + + return { + next: async (ctx) => { + const docInfo = await storageIterator.next() + if (docInfo !== undefined) { + buffer.push(docInfo) + } + if (buffer.length > 50) { + await this.doSyncDocs(ctx, workspaceId, buffer) + + buffer = [] + } + if (docInfo !== undefined) { + return { + hash: docInfo.etag, + id: docInfo._id, + size: docInfo.size + } + } + }, + close: async (ctx) => { + if (buffer.length > 0) { + await this.doSyncDocs(ctx, workspaceId, buffer) + } + await storageIterator.close() + } + } + } + + private makeStorageIterator (ctx: MeasureContext, workspaceId: WorkspaceId): BlobStorageIterator { const adapters = Array.from(this.adapters.values()) let iterator: BlobStorageIterator | undefined return { - next: async (ctx) => { + next: async () => { while (true) { if (iterator === undefined && adapters.length > 0) { iterator = await (adapters.shift() as StorageAdapter).listStream(ctx, workspaceId) @@ -90,11 +144,8 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE } const docInfo = await iterator.next() if (docInfo !== undefined) { - return { - hash: docInfo.etag, - id: docInfo._id, - size: docInfo.size - } + // We need to check if our stored version is fine + return docInfo } else { // We need to take next adapter await iterator.close() @@ -102,7 +153,7 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE } } }, - close: async (ctx) => { + close: async () => { if (iterator !== undefined) { await iterator.close() } @@ -250,9 +301,12 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE selectProvider ( forceProvider: string | undefined, contentType: string - ): { adapter: StorageAdapter | undefined, provider: string } { + ): { adapter: StorageAdapter, provider: string } { if (forceProvider !== undefined) { - return { adapter: this.adapters.get(forceProvider), provider: forceProvider } + return { + adapter: this.adapters.get(forceProvider) ?? (this.adapters.get(this.defaultAdapter) as StorageAdapter), + provider: forceProvider + } } // try select provider based on content type matching. for (const [provider, adapter] of this.adapters.entries()) { @@ -265,7 +319,7 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE } } - return { adapter: this.adapters.get(this.defaultAdapter), provider: this.defaultAdapter } + return { adapter: this.adapters.get(this.defaultAdapter) as StorageAdapter, provider: this.defaultAdapter } } @withContext('aggregator-put', {}) @@ -283,9 +337,6 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE ).shift() const { provider, adapter } = this.selectProvider(stat?.provider, contentType) - if (adapter === undefined) { - throw new NoSuchKeyError('No such provider found') - } const result = await adapter.put(ctx, workspaceId, objectName, stream, contentType, size) diff --git a/server/server-storage/src/blobStorage.ts b/server/server-storage/src/blobStorage.ts index 34911dab32..ac615c82fb 100644 --- a/server/server-storage/src/blobStorage.ts +++ b/server/server-storage/src/blobStorage.ts @@ -65,6 +65,9 @@ class StorageBlobAdapter implements DbAdapter { } find (ctx: MeasureContext, domain: Domain, recheck?: boolean): StorageIterator { + if (recheck === true) { + return (this.client as StorageAdapterEx).find(ctx, this.workspaceId) + } return this.blobAdapter.find(ctx, domain, recheck) }