mirror of
https://github.com/toeverything/AFFiNE.git
synced 2024-12-28 12:32:09 +03:00
fix: store full state as update as solution for serialization
This commit is contained in:
parent
b1943aaad9
commit
0c21ccb04b
@ -1,16 +1,9 @@
|
|||||||
use ipc_types::document::{
|
use ipc_types::document::{
|
||||||
CreateDocumentParameter, GetDocumentParameter, GetDocumentResponse, YDocumentUpdate,
|
CreateDocumentParameter, GetDocumentParameter, GetDocumentResponse, YDocumentUpdate,
|
||||||
};
|
};
|
||||||
use jwst::encode_update;
|
|
||||||
use jwst::DocStorage;
|
use jwst::DocStorage;
|
||||||
use jwst::Workspace as OctoBaseWorkspace;
|
use jwst::Workspace as OctoBaseWorkspace;
|
||||||
use lib0::any::Any;
|
use lib0::any::Any;
|
||||||
use std::sync::{Arc, RwLock};
|
|
||||||
use y_sync::sync::Message;
|
|
||||||
use y_sync::sync::MessageReader;
|
|
||||||
use y_sync::sync::SyncMessage;
|
|
||||||
use yrs::updates::decoder::DecoderV1;
|
|
||||||
use yrs::{updates::decoder::Decode, Doc, StateVector, Update};
|
|
||||||
|
|
||||||
use crate::state::AppState;
|
use crate::state::AppState;
|
||||||
|
|
||||||
@ -53,7 +46,6 @@ pub async fn get_doc<'s>(
|
|||||||
) -> Result<GetDocumentResponse, String> {
|
) -> Result<GetDocumentResponse, String> {
|
||||||
// TODO: check user permission
|
// TODO: check user permission
|
||||||
let state = &state.0.lock().await;
|
let state = &state.0.lock().await;
|
||||||
let doc_store = &state.doc_store;
|
|
||||||
let doc_db = &state.doc_db;
|
let doc_db = &state.doc_db;
|
||||||
|
|
||||||
if let Ok(all_updates_of_workspace) = doc_db.all(¶meters.id).await {
|
if let Ok(all_updates_of_workspace) = doc_db.all(¶meters.id).await {
|
||||||
@ -61,18 +53,8 @@ pub async fn get_doc<'s>(
|
|||||||
.iter()
|
.iter()
|
||||||
.map(|model| model.blob.clone())
|
.map(|model| model.blob.clone())
|
||||||
.collect::<Vec<Vec<u8>>>();
|
.collect::<Vec<Vec<u8>>>();
|
||||||
all_updates
|
|
||||||
.iter()
|
|
||||||
.for_each(|update_blob| {
|
|
||||||
let mut tx = doc_store.doc().transact();
|
|
||||||
let update = Update::decode_v1(&update_blob).unwrap();
|
|
||||||
tx.apply_update(update);
|
|
||||||
tx.commit();
|
|
||||||
});
|
|
||||||
let merged_update = doc_store.doc().transact().encode_update_v1();
|
|
||||||
// TODO: store merged update here
|
|
||||||
Ok(GetDocumentResponse {
|
Ok(GetDocumentResponse {
|
||||||
updates: vec![merged_update],
|
updates: all_updates,
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
Err(format!(
|
Err(format!(
|
||||||
@ -88,16 +70,10 @@ pub async fn update_y_document<'s>(
|
|||||||
parameters: YDocumentUpdate,
|
parameters: YDocumentUpdate,
|
||||||
) -> Result<bool, String> {
|
) -> Result<bool, String> {
|
||||||
let state = &state.0.lock().await;
|
let state = &state.0.lock().await;
|
||||||
let doc_store = &state.doc_store;
|
|
||||||
let mut tx = doc_store.doc().transact();
|
|
||||||
let update = Update::decode_v1(¶meters.update).unwrap();
|
|
||||||
tx.apply_update(update);
|
|
||||||
let merged_update = tx.encode_update_v1();
|
|
||||||
tx.commit();
|
|
||||||
let doc_db = &state.doc_db;
|
let doc_db = &state.doc_db;
|
||||||
|
|
||||||
doc_db
|
doc_db
|
||||||
.replace_with(¶meters.id.clone(), merged_update)
|
.replace_with(¶meters.id.clone(), parameters.update)
|
||||||
.await
|
.await
|
||||||
.ok();
|
.ok();
|
||||||
|
|
||||||
|
@ -7,8 +7,6 @@ use tokio::sync::Mutex;
|
|||||||
|
|
||||||
pub struct AppStateRaw {
|
pub struct AppStateRaw {
|
||||||
pub doc_db: DocAutoStorage,
|
pub doc_db: DocAutoStorage,
|
||||||
/// yDoc for receiving yjs update and merge them, before serialize update into sqlite
|
|
||||||
pub doc_store: Workspace,
|
|
||||||
pub blob_storage: BlobAutoStorage,
|
pub blob_storage: BlobAutoStorage,
|
||||||
pub metadata_db: SqliteDBContext,
|
pub metadata_db: SqliteDBContext,
|
||||||
}
|
}
|
||||||
@ -41,9 +39,6 @@ impl AppStateRaw {
|
|||||||
|
|
||||||
Some(Self {
|
Some(Self {
|
||||||
doc_db: DocAutoStorage::init_pool(&doc_db_env).await.unwrap(),
|
doc_db: DocAutoStorage::init_pool(&doc_db_env).await.unwrap(),
|
||||||
// with fake id, we only use yDoc inside of it
|
|
||||||
// TODO: use workspace pool, to handle multiple workspace
|
|
||||||
doc_store: Workspace::new(""),
|
|
||||||
blob_storage: BlobAutoStorage::init_pool(&blob_db_env).await.unwrap(),
|
blob_storage: BlobAutoStorage::init_pool(&blob_db_env).await.unwrap(),
|
||||||
metadata_db: SqliteDBContext::new(metadata_db_env).await,
|
metadata_db: SqliteDBContext::new(metadata_db_env).await,
|
||||||
})
|
})
|
||||||
|
@ -39,7 +39,7 @@ export class TauriIPCProvider extends LocalProvider {
|
|||||||
}
|
}
|
||||||
// we create a default user if we don't have one.
|
// we create a default user if we don't have one.
|
||||||
try {
|
try {
|
||||||
const user = await this.#ipc!.createUser({
|
const user = await this.#ipc?.createUser({
|
||||||
email: 'xxx@xx.xx',
|
email: 'xxx@xx.xx',
|
||||||
name: 'xxx',
|
name: 'xxx',
|
||||||
password: 'xxx',
|
password: 'xxx',
|
||||||
@ -57,7 +57,7 @@ export class TauriIPCProvider extends LocalProvider {
|
|||||||
blocksuiteWorkspace: BlocksuiteWorkspace
|
blocksuiteWorkspace: BlocksuiteWorkspace
|
||||||
) {
|
) {
|
||||||
this._logger(`Loading ${workspaceID}...`);
|
this._logger(`Loading ${workspaceID}...`);
|
||||||
const result = await this.#ipc!.getYDocument({ id: workspaceID });
|
const result = await this.#ipc?.getYDocument({ id: workspaceID });
|
||||||
if (result) {
|
if (result) {
|
||||||
const updates = result.updates.map(
|
const updates = result.updates.map(
|
||||||
binaryUpdate => new Uint8Array(binaryUpdate)
|
binaryUpdate => new Uint8Array(binaryUpdate)
|
||||||
@ -65,21 +65,6 @@ export class TauriIPCProvider extends LocalProvider {
|
|||||||
|
|
||||||
const mergedUpdate = Y.mergeUpdates(updates);
|
const mergedUpdate = Y.mergeUpdates(updates);
|
||||||
await applyUpdate(blocksuiteWorkspace, mergedUpdate);
|
await applyUpdate(blocksuiteWorkspace, mergedUpdate);
|
||||||
console.group('#initDocFromIPC');
|
|
||||||
// DEBUG: console blocksuiteWorkspace.room
|
|
||||||
console.log(`blocksuiteWorkspace.room`, blocksuiteWorkspace.room);
|
|
||||||
// DEBUG: console blocksuiteWorkspace.doc.guid
|
|
||||||
console.log(`blocksuiteWorkspace.doc.guid`, blocksuiteWorkspace.doc.guid);
|
|
||||||
// DEBUG: console blocksuiteWorkspace
|
|
||||||
console.log(`blocksuiteWorkspace`, blocksuiteWorkspace);
|
|
||||||
// DEBUG: console blocksuiteWorkspace.meta
|
|
||||||
console.log(`blocksuiteWorkspace.meta`, blocksuiteWorkspace.meta);
|
|
||||||
// DEBUG: console blocksuiteWorkspace.meta.pageMetas
|
|
||||||
console.log(
|
|
||||||
`blocksuiteWorkspace.meta.pageMetas`,
|
|
||||||
blocksuiteWorkspace.meta.pageMetas
|
|
||||||
);
|
|
||||||
console.groupEnd();
|
|
||||||
this._logger(`Loaded: ${workspaceID}`);
|
this._logger(`Loaded: ${workspaceID}`);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -91,33 +76,14 @@ export class TauriIPCProvider extends LocalProvider {
|
|||||||
this._logger(`Connecting yDoc for ${workspaceID}...`);
|
this._logger(`Connecting yDoc for ${workspaceID}...`);
|
||||||
blocksuiteWorkspace.doc.on('update', async (update: Uint8Array) => {
|
blocksuiteWorkspace.doc.on('update', async (update: Uint8Array) => {
|
||||||
try {
|
try {
|
||||||
// TODO: need handle potential data race when update is frequent?
|
const binary = Y.encodeStateAsUpdate(blocksuiteWorkspace.doc);
|
||||||
// TODO: update seems too frequent upon each keydown, why no batching?
|
const success = await this.#ipc?.updateYDocument({
|
||||||
const success = await this.#ipc!.updateYDocument({
|
update: Array.from(binary),
|
||||||
update: Array.from(update),
|
|
||||||
id: workspaceID,
|
id: workspaceID,
|
||||||
});
|
});
|
||||||
if (!success) {
|
if (!success) {
|
||||||
throw new Error(`YDoc update failed, id: ${workspaceID}`);
|
throw new Error(`YDoc update failed, id: ${workspaceID}`);
|
||||||
}
|
}
|
||||||
console.group('update');
|
|
||||||
// DEBUG: console blocksuiteWorkspa?ce.meta
|
|
||||||
console.log(`blocksuiteWorkspace?.meta`, blocksuiteWorkspace?.meta);
|
|
||||||
// DEBUG: console blocksuiteWorkspace?.meta?.pageMetas
|
|
||||||
console.log(
|
|
||||||
`blocksuiteWorkspace?.meta?.pageMetas`,
|
|
||||||
blocksuiteWorkspace?.meta?.pageMetas
|
|
||||||
);
|
|
||||||
// DEBUG: console doc
|
|
||||||
console.log(`doc1`, blocksuiteWorkspace.doc);
|
|
||||||
// DEBUG: console doc.meta
|
|
||||||
console.log(`doc1.meta`, blocksuiteWorkspace.doc.meta);
|
|
||||||
// DEBUG: console doc.meta.pageMetas
|
|
||||||
console.log(
|
|
||||||
`doc1.meta?.pageMetas`,
|
|
||||||
blocksuiteWorkspace.doc.meta?.pageMetas
|
|
||||||
);
|
|
||||||
console.groupEnd();
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
// TODO: write error log to disk, and add button to open them in settings panel
|
// TODO: write error log to disk, and add button to open them in settings panel
|
||||||
console.error("#yDocument.on('update'", error);
|
console.error("#yDocument.on('update'", error);
|
||||||
@ -130,7 +96,7 @@ export class TauriIPCProvider extends LocalProvider {
|
|||||||
}
|
}
|
||||||
|
|
||||||
override async warpWorkspace(blocksuiteWorkspace: BlocksuiteWorkspace) {
|
override async warpWorkspace(blocksuiteWorkspace: BlocksuiteWorkspace) {
|
||||||
const { doc, room } = blocksuiteWorkspace;
|
const { room } = blocksuiteWorkspace;
|
||||||
assert(room);
|
assert(room);
|
||||||
|
|
||||||
(await blocksuiteWorkspace.blobs)?.addProvider(new IPCBlobProvider());
|
(await blocksuiteWorkspace.blobs)?.addProvider(new IPCBlobProvider());
|
||||||
@ -144,8 +110,8 @@ export class TauriIPCProvider extends LocalProvider {
|
|||||||
meta: CreateWorkspaceInfoParams
|
meta: CreateWorkspaceInfoParams
|
||||||
): Promise<WorkspaceUnit | undefined> {
|
): Promise<WorkspaceUnit | undefined> {
|
||||||
this._logger('Creating client app workspace');
|
this._logger('Creating client app workspace');
|
||||||
|
assert(this.#ipc);
|
||||||
const { id } = await this.#ipc!.createWorkspace({
|
const { id } = await this.#ipc.createWorkspace({
|
||||||
name: meta.name,
|
name: meta.name,
|
||||||
// TODO: get userID here
|
// TODO: get userID here
|
||||||
user_id: this.#defaultUserID,
|
user_id: this.#defaultUserID,
|
||||||
@ -165,7 +131,7 @@ export class TauriIPCProvider extends LocalProvider {
|
|||||||
const doc = workspaceUnit?.blocksuiteWorkspace?.doc;
|
const doc = workspaceUnit?.blocksuiteWorkspace?.doc;
|
||||||
if (doc) {
|
if (doc) {
|
||||||
const update = Y.encodeStateAsUpdate(doc);
|
const update = Y.encodeStateAsUpdate(doc);
|
||||||
const success = await this.#ipc!.updateYDocument({
|
const success = await this.#ipc?.updateYDocument({
|
||||||
update: Array.from(update),
|
update: Array.from(update),
|
||||||
id,
|
id,
|
||||||
});
|
});
|
||||||
@ -174,7 +140,8 @@ export class TauriIPCProvider extends LocalProvider {
|
|||||||
}
|
}
|
||||||
|
|
||||||
override async loadWorkspaces(): Promise<WorkspaceUnit[]> {
|
override async loadWorkspaces(): Promise<WorkspaceUnit[]> {
|
||||||
const { workspaces } = await this.#ipc!.getWorkspaces({
|
assert(this.#ipc);
|
||||||
|
const { workspaces } = await this.#ipc.getWorkspaces({
|
||||||
user_id: this.#defaultUserID,
|
user_id: this.#defaultUserID,
|
||||||
});
|
});
|
||||||
const workspaceUnits = await Promise.all(
|
const workspaceUnits = await Promise.all(
|
||||||
@ -193,12 +160,3 @@ export class TauriIPCProvider extends LocalProvider {
|
|||||||
return workspaceUnits;
|
return workspaceUnits;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function stringifyUint8Array(uint8: Uint8Array): string {
|
|
||||||
return (
|
|
||||||
' ' +
|
|
||||||
Array.from(uint8)
|
|
||||||
.map(a => a.toString(16).padStart(2, '0'))
|
|
||||||
.join(' ')
|
|
||||||
).replace(/((?: \S+){8})/g, '$1\n');
|
|
||||||
}
|
|
||||||
|
Loading…
Reference in New Issue
Block a user