feat(nbstore): add blob sync frontend (#9084)

This commit is contained in:
EYHN 2024-12-11 12:39:26 +00:00
parent a67fbc9448
commit 0c0722c650
No known key found for this signature in database
GPG Key ID: 46C9E26A75AB276C
3 changed files with 54 additions and 3 deletions

View File

@ -0,0 +1,23 @@
import type { BlobRecord, BlobStorage } from '../storage';
import type { BlobSyncEngine } from '../sync/blob';
export class BlobFrontend {
constructor(
readonly storage: BlobStorage,
readonly sync?: BlobSyncEngine
) {}
get(blobId: string) {
return this.sync
? this.sync.downloadBlob(blobId)
: this.storage.get(blobId);
}
set(blob: BlobRecord) {
return this.sync ? this.sync.uploadBlob(blob) : this.storage.set(blob);
}
addPriority(id: string, priority: number) {
return this.sync?.addPriority(id, priority);
}
}

View File

@ -6,14 +6,14 @@ export interface BlobRecord {
key: string; key: string;
data: Uint8Array; data: Uint8Array;
mime: string; mime: string;
createdAt: Date; createdAt?: Date;
} }
export interface ListedBlobRecord { export interface ListedBlobRecord {
key: string; key: string;
mime: string; mime: string;
size: number; size: number;
createdAt: Date; createdAt?: Date;
} }
export abstract class BlobStorage< export abstract class BlobStorage<

View File

@ -1,6 +1,6 @@
import { difference } from 'lodash-es'; import { difference } from 'lodash-es';
import type { BlobStorage } from '../../storage'; import type { BlobRecord, BlobStorage } from '../../storage';
import { MANUALLY_STOP, throwIfAborted } from '../../utils/throw-if-aborted'; import { MANUALLY_STOP, throwIfAborted } from '../../utils/throw-if-aborted';
export class BlobSyncEngine { export class BlobSyncEngine {
@ -11,6 +11,29 @@ export class BlobSyncEngine {
readonly remotes: BlobStorage[] readonly remotes: BlobStorage[]
) {} ) {}
async downloadBlob(blobId: string, signal?: AbortSignal) {
const localBlob = await this.local.get(blobId, signal);
if (localBlob) {
return localBlob;
}
for (const storage of this.remotes) {
const data = await storage.get(blobId, signal);
if (data) {
await this.local.set(data, signal);
return data;
}
}
return null;
}
async uploadBlob(blob: BlobRecord, signal?: AbortSignal) {
await this.local.set(blob);
await Promise.allSettled(
this.remotes.map(remote => remote.set(blob, signal))
);
}
private async sync(signal?: AbortSignal) { private async sync(signal?: AbortSignal) {
throwIfAborted(signal); throwIfAborted(signal);
@ -94,4 +117,9 @@ export class BlobSyncEngine {
this.abort?.abort(); this.abort?.abort();
this.abort = null; this.abort = null;
} }
addPriority(_id: string, _priority: number): () => void {
// TODO: implement
return () => {};
}
} }