refactor(electron): use sqlite to store server clock & sync meta (#6957)

After this PR, IDB should not be used in desktop any longer.
This commit is contained in:
pengx17 2024-05-16 06:31:04 +00:00
parent 27af9b4d1a
commit 3cca879a83
No known key found for this signature in database
GPG Key ID: 23F23D9E8B3971ED
8 changed files with 322 additions and 84 deletions

View File

@ -4,6 +4,7 @@
"private": true,
"exports": {
"./blocksuite": "./src/blocksuite/index.ts",
"./storage": "./src/storage/index.ts",
"./app-config-storage": "./src/app-config-storage.ts",
".": "./src/index.ts"
},

View File

@ -1,15 +1,13 @@
import { apis } from '@affine/electron-api';
import type { ByteKV, ByteKVBehavior, DocStorage } from '@toeverything/infra';
import { AsyncLock, MemoryDocEventBus } from '@toeverything/infra';
import type { DBSchema, IDBPDatabase, IDBPObjectStore } from 'idb';
import { openDB } from 'idb';
export class SqliteDocStorage implements DocStorage {
constructor(private readonly workspaceId: string) {}
eventBus = new MemoryDocEventBus();
readonly doc = new Doc(this.workspaceId);
readonly syncMetadata = new KV(`${this.workspaceId}:sync-metadata`);
readonly serverClock = new KV(`${this.workspaceId}:server-clock`);
readonly syncMetadata = new SyncMetadataKV(this.workspaceId);
readonly serverClock = new ServerClockKV(this.workspaceId);
}
type DocType = DocStorage['doc'];
@ -72,102 +70,86 @@ class Doc implements DocType {
}
}
interface KvDBSchema extends DBSchema {
kv: {
key: string;
value: { key: string; val: Uint8Array };
};
}
class KV implements ByteKV {
constructor(private readonly dbName: string) {}
dbPromise: Promise<IDBPDatabase<KvDBSchema>> | null = null;
dbVersion = 1;
upgradeDB(db: IDBPDatabase<KvDBSchema>) {
db.createObjectStore('kv', { keyPath: 'key' });
class SyncMetadataKV implements ByteKV {
constructor(private readonly workspaceId: string) {}
transaction<T>(cb: (behavior: ByteKVBehavior) => Promise<T>): Promise<T> {
return cb(this);
}
getDb() {
if (this.dbPromise === null) {
this.dbPromise = openDB<KvDBSchema>(this.dbName, this.dbVersion, {
upgrade: db => this.upgradeDB(db),
});
get(key: string): Uint8Array | null | Promise<Uint8Array | null> {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
return this.dbPromise;
return apis.db.getSyncMetadata(this.workspaceId, key);
}
async transaction<T>(
cb: (transaction: ByteKVBehavior) => Promise<T>
): Promise<T> {
const db = await this.getDb();
const store = db.transaction('kv', 'readwrite').objectStore('kv');
const behavior = new KVBehavior(store);
return await cb(behavior);
set(key: string, data: Uint8Array): void | Promise<void> {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
return apis.db.setSyncMetadata(this.workspaceId, key, data);
}
async get(key: string): Promise<Uint8Array | null> {
const db = await this.getDb();
const store = db.transaction('kv', 'readonly').objectStore('kv');
return new KVBehavior(store).get(key);
keys(): string[] | Promise<string[]> {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
return apis.db.getSyncMetadataKeys(this.workspaceId);
}
async set(key: string, value: Uint8Array): Promise<void> {
const db = await this.getDb();
const store = db.transaction('kv', 'readwrite').objectStore('kv');
return new KVBehavior(store).set(key, value);
del(key: string): void | Promise<void> {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
return apis.db.delSyncMetadata(this.workspaceId, key);
}
async keys(): Promise<string[]> {
const db = await this.getDb();
const store = db.transaction('kv', 'readwrite').objectStore('kv');
return new KVBehavior(store).keys();
}
async clear() {
const db = await this.getDb();
const store = db.transaction('kv', 'readwrite').objectStore('kv');
return new KVBehavior(store).clear();
}
async del(key: string) {
const db = await this.getDb();
const store = db.transaction('kv', 'readwrite').objectStore('kv');
return new KVBehavior(store).del(key);
clear(): void | Promise<void> {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
return apis.db.clearSyncMetadata(this.workspaceId);
}
}
class KVBehavior implements ByteKVBehavior {
constructor(
private readonly store: IDBPObjectStore<KvDBSchema, ['kv'], 'kv', any>
) {}
async get(key: string): Promise<Uint8Array | null> {
const value = await this.store.get(key);
return value?.val ?? null;
}
async set(key: string, value: Uint8Array): Promise<void> {
if (this.store.put === undefined) {
throw new Error('Cannot set in a readonly transaction');
}
await this.store.put({
key: key,
val: value,
});
}
async keys(): Promise<string[]> {
return await this.store.getAllKeys();
class ServerClockKV implements ByteKV {
constructor(private readonly workspaceId: string) {}
transaction<T>(cb: (behavior: ByteKVBehavior) => Promise<T>): Promise<T> {
return cb(this);
}
async del(key: string) {
if (this.store.delete === undefined) {
throw new Error('Cannot set in a readonly transaction');
get(key: string): Uint8Array | null | Promise<Uint8Array | null> {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
return await this.store.delete(key);
return apis.db.getServerClock(this.workspaceId, key);
}
async clear() {
if (this.store.clear === undefined) {
throw new Error('Cannot set in a readonly transaction');
set(key: string, data: Uint8Array): void | Promise<void> {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
return await this.store.clear();
return apis.db.setServerClock(this.workspaceId, key, data);
}
keys(): string[] | Promise<string[]> {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
return apis.db.getServerClockKeys(this.workspaceId);
}
del(key: string): void | Promise<void> {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
return apis.db.delServerClock(this.workspaceId, key);
}
clear(): void | Promise<void> {
if (!apis?.db) {
throw new Error('sqlite datasource is not available');
}
return apis.db.clearServerClock(this.workspaceId);
}
}

View File

@ -1,6 +1,7 @@
import type { InsertRow } from '@affine/native';
import { SqliteConnection, ValidationResult } from '@affine/native';
import { WorkspaceVersion } from '@toeverything/infra/blocksuite';
import type { ByteKVBehavior } from '@toeverything/infra/storage';
import { logger } from '../logger';
import { applyGuidCompatibilityFix, migrateToLatest } from './migration';
@ -175,4 +176,82 @@ export class SQLiteAdapter {
logger.error('replaceUpdates', error);
}
}
serverClock: ByteKVBehavior = {
get: async key => {
if (!this.db) {
logger.warn(`${this.path} is not connected`);
return null;
}
const blob = await this.db.getServerClock(key);
return blob?.data ?? null;
},
set: async (key, data) => {
if (!this.db) {
logger.warn(`${this.path} is not connected`);
return;
}
await this.db.setServerClock(key, data);
},
keys: async () => {
if (!this.db) {
logger.warn(`${this.path} is not connected`);
return [];
}
return await this.db.getServerClockKeys();
},
del: async key => {
if (!this.db) {
logger.warn(`${this.path} is not connected`);
return;
}
await this.db.delServerClock(key);
},
clear: async () => {
if (!this.db) {
logger.warn(`${this.path} is not connected`);
return;
}
await this.db.clearServerClock();
},
};
syncMetadata: ByteKVBehavior = {
get: async key => {
if (!this.db) {
logger.warn(`${this.path} is not connected`);
return null;
}
const blob = await this.db.getSyncMetadata(key);
return blob?.data ?? null;
},
set: async (key, data) => {
if (!this.db) {
logger.warn(`${this.path} is not connected`);
return;
}
await this.db.setSyncMetadata(key, data);
},
keys: async () => {
if (!this.db) {
logger.warn(`${this.path} is not connected`);
return [];
}
return await this.db.getSyncMetadataKeys();
},
del: async key => {
if (!this.db) {
logger.warn(`${this.path} is not connected`);
return;
}
await this.db.delSyncMetadata(key);
},
clear: async () => {
if (!this.db) {
logger.warn(`${this.path} is not connected`);
return;
}
await this.db.clearSyncMetadata();
},
};
}

View File

@ -40,6 +40,54 @@ export const dbHandlers = {
getDefaultStorageLocation: async () => {
return await mainRPC.getPath('sessionData');
},
getServerClock: async (workspaceId: string, key: string) => {
const workspaceDB = await ensureSQLiteDB(workspaceId);
return workspaceDB.adapter.serverClock.get(key);
},
setServerClock: async (
workspaceId: string,
key: string,
data: Uint8Array
) => {
const workspaceDB = await ensureSQLiteDB(workspaceId);
return workspaceDB.adapter.serverClock.set(key, data);
},
getServerClockKeys: async (workspaceId: string) => {
const workspaceDB = await ensureSQLiteDB(workspaceId);
return workspaceDB.adapter.serverClock.keys();
},
clearServerClock: async (workspaceId: string) => {
const workspaceDB = await ensureSQLiteDB(workspaceId);
return workspaceDB.adapter.serverClock.clear();
},
delServerClock: async (workspaceId: string, key: string) => {
const workspaceDB = await ensureSQLiteDB(workspaceId);
return workspaceDB.adapter.serverClock.del(key);
},
getSyncMetadata: async (workspaceId: string, key: string) => {
const workspaceDB = await ensureSQLiteDB(workspaceId);
return workspaceDB.adapter.syncMetadata.get(key);
},
setSyncMetadata: async (
workspaceId: string,
key: string,
data: Uint8Array
) => {
const workspaceDB = await ensureSQLiteDB(workspaceId);
return workspaceDB.adapter.syncMetadata.set(key, data);
},
getSyncMetadataKeys: async (workspaceId: string) => {
const workspaceDB = await ensureSQLiteDB(workspaceId);
return workspaceDB.adapter.syncMetadata.keys();
},
clearSyncMetadata: async (workspaceId: string) => {
const workspaceDB = await ensureSQLiteDB(workspaceId);
return workspaceDB.adapter.syncMetadata.clear();
},
delSyncMetadata: async (workspaceId: string, key: string) => {
const workspaceDB = await ensureSQLiteDB(workspaceId);
return workspaceDB.adapter.syncMetadata.del(key);
},
};
export const dbEvents = {} satisfies Record<string, MainEventRegister>;

View File

@ -31,7 +31,7 @@ export class WorkspaceSQLiteDB {
this.update$.complete();
}
toDBDocId = (docId: string) => {
private readonly toDBDocId = (docId: string) => {
return this.workspaceId === docId ? undefined : docId;
};

View File

@ -13,6 +13,16 @@ export class SqliteConnection {
getAllUpdates(): Promise<Array<UpdateRow>>
insertUpdates(updates: Array<InsertRow>): Promise<void>
replaceUpdates(docId: string | undefined | null, updates: Array<InsertRow>): Promise<void>
getServerClock(key: string): Promise<BlobRow | null>
setServerClock(key: string, data: Uint8Array): Promise<void>
getServerClockKeys(): Promise<Array<string>>
clearServerClock(): Promise<void>
delServerClock(key: string): Promise<void>
getSyncMetadata(key: string): Promise<BlobRow | null>
setSyncMetadata(key: string, data: Uint8Array): Promise<void>
getSyncMetadataKeys(): Promise<Array<string>>
clearSyncMetadata(): Promise<void>
delSyncMetadata(key: string): Promise<void>
initVersion(): Promise<void>
setVersion(version: number): Promise<void>
getMaxVersion(): Promise<number>

View File

@ -15,5 +15,15 @@ CREATE TABLE IF NOT EXISTS "blobs" (
CREATE TABLE IF NOT EXISTS "version_info" (
version NUMBER NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL
);
CREATE TABLE IF NOT EXISTS "server_clock" (
key TEXT PRIMARY KEY NOT NULL,
data BLOB NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL
);
CREATE TABLE IF NOT EXISTS "sync_metadata" (
key TEXT PRIMARY KEY NOT NULL,
data BLOB NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL
)
"#;

View File

@ -252,6 +252,114 @@ impl SqliteConnection {
Ok(())
}
#[napi]
pub async fn get_server_clock(&self, key: String) -> Option<BlobRow> {
sqlx::query_as!(
BlobRow,
"SELECT key, data, timestamp FROM server_clock WHERE key = ?",
key
)
.fetch_one(&self.pool)
.await
.ok()
}
#[napi]
pub async fn set_server_clock(&self, key: String, data: Uint8Array) -> napi::Result<()> {
let data = data.as_ref();
sqlx::query!(
"INSERT INTO server_clock (key, data) VALUES ($1, $2) ON CONFLICT(key) DO UPDATE SET data = excluded.data",
key,
data,
)
.execute(&self.pool)
.await
.map_err(anyhow::Error::from)?;
Ok(())
}
#[napi]
pub async fn get_server_clock_keys(&self) -> napi::Result<Vec<String>> {
let keys = sqlx::query!("SELECT key FROM server_clock")
.fetch_all(&self.pool)
.await
.map(|rows| rows.into_iter().map(|row| row.key).collect())
.map_err(anyhow::Error::from)?;
Ok(keys)
}
#[napi]
pub async fn clear_server_clock(&self) -> napi::Result<()> {
sqlx::query!("DELETE FROM server_clock")
.execute(&self.pool)
.await
.map_err(anyhow::Error::from)?;
Ok(())
}
#[napi]
pub async fn del_server_clock(&self, key: String) -> napi::Result<()> {
sqlx::query!("DELETE FROM server_clock WHERE key = ?", key)
.execute(&self.pool)
.await
.map_err(anyhow::Error::from)?;
Ok(())
}
#[napi]
pub async fn get_sync_metadata(&self, key: String) -> Option<BlobRow> {
sqlx::query_as!(
BlobRow,
"SELECT key, data, timestamp FROM sync_metadata WHERE key = ?",
key
)
.fetch_one(&self.pool)
.await
.ok()
}
#[napi]
pub async fn set_sync_metadata(&self, key: String, data: Uint8Array) -> napi::Result<()> {
let data = data.as_ref();
sqlx::query!(
"INSERT INTO sync_metadata (key, data) VALUES ($1, $2) ON CONFLICT(key) DO UPDATE SET data = excluded.data",
key,
data,
)
.execute(&self.pool)
.await
.map_err(anyhow::Error::from)?;
Ok(())
}
#[napi]
pub async fn get_sync_metadata_keys(&self) -> napi::Result<Vec<String>> {
let keys = sqlx::query!("SELECT key FROM sync_metadata")
.fetch_all(&self.pool)
.await
.map(|rows| rows.into_iter().map(|row| row.key).collect())
.map_err(anyhow::Error::from)?;
Ok(keys)
}
#[napi]
pub async fn clear_sync_metadata(&self) -> napi::Result<()> {
sqlx::query!("DELETE FROM sync_metadata")
.execute(&self.pool)
.await
.map_err(anyhow::Error::from)?;
Ok(())
}
#[napi]
pub async fn del_sync_metadata(&self, key: String) -> napi::Result<()> {
sqlx::query!("DELETE FROM sync_metadata WHERE key = ?", key)
.execute(&self.pool)
.await
.map_err(anyhow::Error::from)?;
Ok(())
}
#[napi]
pub async fn init_version(&self) -> napi::Result<()> {
// create version_info table