diff --git a/models/core/src/migration.ts b/models/core/src/migration.ts index 8b3130a753..69d30408ca 100644 --- a/models/core/src/migration.ts +++ b/models/core/src/migration.ts @@ -337,7 +337,7 @@ async function migrateBlobData (exAdapter: StorageAdapterEx, client: MigrationCl if (!(await adapter.exists(ctx, client.workspaceId))) { continue } - const blobs = await adapter.listStream(ctx, client.workspaceId, '') + const blobs = await adapter.listStream(ctx, client.workspaceId) const bulk = new Map, Blob>() try { const push = async (force: boolean): Promise => { diff --git a/packages/storage/src/index.ts b/packages/storage/src/index.ts index 01fa3e72e2..95a0e97cc0 100644 --- a/packages/storage/src/index.ts +++ b/packages/storage/src/index.ts @@ -45,7 +45,7 @@ export interface StorageAdapter { listBuckets: (ctx: MeasureContext) => Promise remove: (ctx: MeasureContext, workspaceId: WorkspaceId, objectNames: string[]) => Promise - listStream: (ctx: MeasureContext, workspaceId: WorkspaceId, prefix?: string) => Promise + listStream: (ctx: MeasureContext, workspaceId: WorkspaceId) => Promise stat: (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string) => Promise get: (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string) => Promise put: ( @@ -114,15 +114,11 @@ export class DummyStorageAdapter implements StorageAdapter, StorageAdapterEx { async remove (ctx: MeasureContext, workspaceId: WorkspaceId, objectNames: string[]): Promise {} - async list (ctx: MeasureContext, workspaceId: WorkspaceId, prefix?: string | undefined): Promise { + async list (ctx: MeasureContext, workspaceId: WorkspaceId): Promise { return [] } - async listStream ( - ctx: MeasureContext, - workspaceId: WorkspaceId, - prefix?: string | undefined - ): Promise { + async listStream (ctx: MeasureContext, workspaceId: WorkspaceId): Promise { return { next: async (): Promise => { return undefined @@ -203,11 +199,10 @@ export async function removeAllObjects ( export async function objectsToArray ( ctx: MeasureContext, storage: StorageAdapter, - workspaceId: WorkspaceId, - prefix?: string + workspaceId: WorkspaceId ): Promise { // We need to list all files and delete them - const iterator = await storage.listStream(ctx, workspaceId, prefix) + const iterator = await storage.listStream(ctx, workspaceId) const bulk: ListBlobResult[] = [] while (true) { const obj = await iterator.next() diff --git a/server/core/src/__tests__/memAdapters.ts b/server/core/src/__tests__/memAdapters.ts index 56aa32435b..a3109c6a59 100644 --- a/server/core/src/__tests__/memAdapters.ts +++ b/server/core/src/__tests__/memAdapters.ts @@ -52,11 +52,7 @@ export class MemStorageAdapter implements StorageAdapter { } } - async listStream ( - ctx: MeasureContext, - workspaceId: WorkspaceId, - prefix?: string | undefined - ): Promise { + async listStream (ctx: MeasureContext, workspaceId: WorkspaceId): Promise { const files = Array.from(this.files.values()).filter((it) => it.workspace === workspaceId.name) return { next: async () => { diff --git a/server/core/src/server/aggregator.ts b/server/core/src/server/aggregator.ts index d11c2546ac..d000c7a2b0 100644 --- a/server/core/src/server/aggregator.ts +++ b/server/core/src/server/aggregator.ts @@ -224,14 +224,8 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE await this.dbAdapter.clean(ctx, workspaceId, DOMAIN_BLOB, objectNames as Ref[]) } - async listStream ( - ctx: MeasureContext, - workspaceId: WorkspaceId, - prefix?: string | undefined - ): Promise { - const data = await this.dbAdapter.findStream(ctx, workspaceId, DOMAIN_BLOB, { - _id: { $regex: `${prefix ?? ''}.*` } - }) + async listStream (ctx: MeasureContext, workspaceId: WorkspaceId): Promise { + const data = await this.dbAdapter.findStream(ctx, workspaceId, DOMAIN_BLOB, {}) return { next: async (): Promise => { return await data.next() diff --git a/server/datalake/src/index.ts b/server/datalake/src/index.ts index 7746512e7a..930be31961 100644 --- a/server/datalake/src/index.ts +++ b/server/datalake/src/index.ts @@ -73,11 +73,7 @@ export class DatalakeService implements StorageAdapter { } @withContext('listStream') - async listStream ( - ctx: MeasureContext, - workspaceId: WorkspaceId, - prefix?: string | undefined - ): Promise { + async listStream (ctx: MeasureContext, workspaceId: WorkspaceId): Promise { throw new Error('not supported') } diff --git a/server/minio/src/index.ts b/server/minio/src/index.ts index fcad03afec..f0987da53f 100644 --- a/server/minio/src/index.ts +++ b/server/minio/src/index.ts @@ -182,11 +182,7 @@ export class MinioService implements StorageAdapter { } @withContext('listStream') - async listStream ( - ctx: MeasureContext, - workspaceId: WorkspaceId, - prefix?: string | undefined - ): Promise { + async listStream (ctx: MeasureContext, workspaceId: WorkspaceId): Promise { let hasMore = true let stream: BucketStream | undefined let done = false @@ -199,7 +195,7 @@ export class MinioService implements StorageAdapter { next: async (): Promise => { try { if (stream === undefined && !done) { - const rprefix = rootPrefix !== undefined ? rootPrefix + (prefix ?? '') : prefix ?? '' + const rprefix = rootPrefix ?? '' stream = this.client.listObjects(this.getBucketId(workspaceId), rprefix, true) stream.on('end', () => { stream?.destroy() diff --git a/server/s3/src/index.ts b/server/s3/src/index.ts index 08040f9a76..cc2c435481 100644 --- a/server/s3/src/index.ts +++ b/server/s3/src/index.ts @@ -232,11 +232,7 @@ export class S3Service implements StorageAdapter { } @withContext('listStream') - async listStream ( - ctx: MeasureContext, - workspaceId: WorkspaceId, - prefix?: string | undefined - ): Promise { + async listStream (ctx: MeasureContext, workspaceId: WorkspaceId): Promise { let hasMore = true const buffer: ListBlobResult[] = [] let token: string | undefined @@ -248,7 +244,7 @@ export class S3Service implements StorageAdapter { if (hasMore && buffer.length === 0) { const res = await this.client.listObjectsV2({ Bucket: this.getBucketId(workspaceId), - Prefix: rootPrefix !== undefined ? rootPrefix + (prefix ?? '') : prefix ?? '', + Prefix: rootPrefix ?? '', ContinuationToken: token }) if (res.IsTruncated === true) { @@ -273,7 +269,7 @@ export class S3Service implements StorageAdapter { } } } catch (err: any) { - ctx.error('Failed to get list', { error: err, workspaceId: workspaceId.name, prefix }) + ctx.error('Failed to get list', { error: err, workspaceId: workspaceId.name }) } if (buffer.length > 0) { return buffer.shift()