Refactor/rename crate (#1275)

This commit is contained in:
Nathan.fooo 2022-10-13 23:29:37 +08:00 committed by GitHub
parent 48bb80b1d0
commit cf4a2920f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
68 changed files with 857 additions and 723 deletions

View File

@ -3,17 +3,17 @@ import 'package:flowy_sdk/dispatch/dispatch.dart';
import 'package:flowy_sdk/protobuf/flowy-folder/view.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-sync/text_block.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-text-block/entities.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-sync/document.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-document/entities.pb.dart';
class DocumentService {
Future<Either<TextBlockPB, FlowyError>> openDocument({
Future<Either<DocumentSnapshotPB, FlowyError>> openDocument({
required String docId,
}) async {
await FolderEventSetLatestView(ViewIdPB(value: docId)).send();
final payload = TextBlockIdPB(value: docId);
return TextBlockEventGetTextBlock(payload).send();
final payload = DocumentIdPB(value: docId);
return DocumentEventGetDocument(payload).send();
}
Future<Either<Unit, FlowyError>> applyEdit({
@ -22,10 +22,10 @@ class DocumentService {
String operations = "",
}) {
final payload = EditPayloadPB.create()
..textBlockId = docId
..docId = docId
..operations = operations
..delta = data;
return TextBlockEventApplyEdit(payload).send();
..operationsStr = data;
return DocumentEventApplyEdit(payload).send();
}
Future<Either<Unit, FlowyError>> closeDocument({required String docId}) {

View File

@ -3,7 +3,7 @@ import 'dart:io';
import 'package:app_flowy/startup/tasks/rust_sdk.dart';
import 'package:app_flowy/workspace/application/markdown/delta_markdown.dart';
import 'package:app_flowy/plugins/doc/application/share_service.dart';
import 'package:flowy_sdk/protobuf/flowy-text-block/entities.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-document/entities.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-folder/view.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart';
import 'package:freezed_annotation/freezed_annotation.dart';

View File

@ -2,15 +2,16 @@ import 'dart:async';
import 'package:dartz/dartz.dart';
import 'package:flowy_sdk/dispatch/dispatch.dart';
import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-text-block/protobuf.dart';
import 'package:flowy_sdk/protobuf/flowy-document/protobuf.dart';
class ShareService {
Future<Either<ExportDataPB, FlowyError>> export(String docId, ExportType type) {
Future<Either<ExportDataPB, FlowyError>> export(
String docId, ExportType type) {
final request = ExportPayloadPB.create()
..viewId = docId
..exportType = type;
return TextBlockEventExportDocument(request).send();
return DocumentEventExportDocument(request).send();
}
Future<Either<ExportDataPB, FlowyError>> exportText(String docId) {

View File

@ -20,7 +20,7 @@ import 'package:flowy_infra_ui/widget/rounded_button.dart';
import 'package:flowy_sdk/log.dart';
import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-folder/view.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-text-block/entities.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-document/entities.pb.dart';
import 'package:flutter/material.dart';
import 'package:flutter_bloc/flutter_bloc.dart';
import 'package:provider/provider.dart';

View File

@ -16,7 +16,7 @@ import 'package:flowy_sdk/ffi.dart' as ffi;
import 'package:flowy_sdk/protobuf/flowy-user/protobuf.dart';
import 'package:flowy_sdk/protobuf/dart-ffi/protobuf.dart';
import 'package:flowy_sdk/protobuf/flowy-folder/protobuf.dart';
import 'package:flowy_sdk/protobuf/flowy-text-block/protobuf.dart';
import 'package:flowy_sdk/protobuf/flowy-document/protobuf.dart';
import 'package:flowy_sdk/protobuf/flowy-grid/protobuf.dart';
import 'package:flowy_sdk/protobuf/flowy-sync/protobuf.dart';
@ -30,7 +30,7 @@ part 'dart_event/flowy-folder/dart_event.dart';
part 'dart_event/flowy-net/dart_event.dart';
part 'dart_event/flowy-user/dart_event.dart';
part 'dart_event/flowy-grid/dart_event.dart';
part 'dart_event/flowy-text-block/dart_event.dart';
part 'dart_event/flowy-document/dart_event.dart';
enum FFIException {
RequestIsEmpty,
@ -56,7 +56,8 @@ class Dispatch {
}
}
Future<Either<Uint8List, Uint8List>> _extractPayload(Future<Either<FFIResponse, FlowyInternalError>> responseFuture) {
Future<Either<Uint8List, Uint8List>> _extractPayload(
Future<Either<FFIResponse, FlowyInternalError>> responseFuture) {
return responseFuture.then((result) {
return result.fold(
(response) {
@ -82,7 +83,8 @@ Future<Either<Uint8List, Uint8List>> _extractPayload(Future<Either<FFIResponse,
});
}
Future<Either<FFIResponse, FlowyInternalError>> _extractResponse(Completer<Uint8List> bytesFuture) {
Future<Either<FFIResponse, FlowyInternalError>> _extractResponse(
Completer<Uint8List> bytesFuture) {
return bytesFuture.future.then((bytes) {
try {
final response = FFIResponse.fromBuffer(bytes);

View File

@ -843,6 +843,47 @@ dependencies = [
"walkdir",
]
[[package]]
name = "flowy-document"
version = "0.1.0"
dependencies = [
"async-stream",
"bytes",
"chrono",
"color-eyre",
"criterion",
"dart-notify",
"dashmap",
"derive_more",
"diesel",
"diesel_derives",
"flowy-database",
"flowy-derive",
"flowy-document",
"flowy-error",
"flowy-revision",
"flowy-sync",
"flowy-test",
"futures",
"futures-util",
"lib-dispatch",
"lib-infra",
"lib-ot",
"lib-ws",
"log",
"protobuf",
"rand 0.8.5",
"serde",
"serde_json",
"strum",
"strum_macros",
"tokio",
"tracing",
"tracing-subscriber",
"unicode-segmentation",
"url",
]
[[package]]
name = "flowy-error"
version = "0.1.0"
@ -882,13 +923,13 @@ dependencies = [
"diesel_derives",
"flowy-database",
"flowy-derive",
"flowy-document",
"flowy-error",
"flowy-folder",
"flowy-folder-data-model",
"flowy-revision",
"flowy-sync",
"flowy-test",
"flowy-text-block",
"futures",
"lazy_static",
"lib-dispatch",
@ -995,11 +1036,11 @@ dependencies = [
"config",
"dashmap",
"flowy-derive",
"flowy-document",
"flowy-error",
"flowy-folder",
"flowy-folder-data-model",
"flowy-sync",
"flowy-text-block",
"flowy-user",
"futures-util",
"http-flowy",
@ -1036,7 +1077,6 @@ dependencies = [
"flowy-sync",
"futures-util",
"lib-infra",
"lib-ot",
"lib-ws",
"serde",
"serde_json",
@ -1055,13 +1095,13 @@ dependencies = [
"claim 0.5.0",
"color-eyre",
"flowy-database",
"flowy-document",
"flowy-folder",
"flowy-grid",
"flowy-grid-data-model",
"flowy-net",
"flowy-revision",
"flowy-sync",
"flowy-text-block",
"flowy-user",
"futures-core",
"futures-util",
@ -1136,47 +1176,6 @@ dependencies = [
"tokio",
]
[[package]]
name = "flowy-text-block"
version = "0.1.0"
dependencies = [
"async-stream",
"bytes",
"chrono",
"color-eyre",
"criterion",
"dart-notify",
"dashmap",
"derive_more",
"diesel",
"diesel_derives",
"flowy-database",
"flowy-derive",
"flowy-error",
"flowy-revision",
"flowy-sync",
"flowy-test",
"flowy-text-block",
"futures",
"futures-util",
"lib-dispatch",
"lib-infra",
"lib-ot",
"lib-ws",
"log",
"protobuf",
"rand 0.8.5",
"serde",
"serde_json",
"strum",
"strum_macros",
"tokio",
"tracing",
"tracing-subscriber",
"unicode-segmentation",
"url",
]
[[package]]
name = "flowy-user"
version = "0.1.0"

View File

@ -11,7 +11,7 @@ members = [
"flowy-database",
"flowy-folder",
"dart-notify",
"flowy-text-block",
"flowy-document",
"flowy-error",
"flowy-revision",
"flowy-grid",

View File

@ -1,6 +1,6 @@
[package]
name = "flowy-text-block"
name = "flowy-document"
version = "0.1.0"
edition = "2018"
@ -41,7 +41,7 @@ futures = "0.3.15"
[dev-dependencies]
flowy-test = { path = "../flowy-test" }
flowy-text-block = { path = "../flowy-text-block", features = ["flowy_unit_test"]}
flowy-document = { path = "../flowy-document", features = ["flowy_unit_test"]}
derive_more = {version = "0.99", features = ["display"]}
tracing-subscriber = "0.2.0"

View File

@ -1,19 +1,22 @@
use crate::web_socket::EditorCommandSender;
use crate::{
errors::FlowyError,
queue::{EditBlockQueue, EditorCommand},
TextEditorUser,
queue::{EditDocumentQueue, EditorCommand},
DocumentUser,
};
use bytes::Bytes;
use flowy_error::{internal_error, FlowyResult};
use flowy_revision::{RevisionCloudService, RevisionManager, RevisionObjectBuilder, RevisionWebSocket};
use flowy_revision::{
RevisionCloudService, RevisionCompress, RevisionManager, RevisionObjectDeserializer, RevisionObjectSerializer,
RevisionWebSocket,
};
use flowy_sync::entities::ws_data::ServerRevisionWSData;
use flowy_sync::{
entities::{revision::Revision, text_block::DocumentPB},
entities::{document::DocumentPayloadPB, revision::Revision},
errors::CollaborateResult,
util::make_operations_from_revisions,
};
use lib_ot::core::AttributeEntry;
use lib_ot::core::{AttributeEntry, AttributeHashMap};
use lib_ot::{
core::{DeltaOperation, Interval},
text_delta::TextOperations,
@ -22,7 +25,7 @@ use lib_ws::WSConnectState;
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};
pub struct TextBlockEditor {
pub struct DocumentEditor {
pub doc_id: String,
#[allow(dead_code)]
rev_manager: Arc<RevisionManager>,
@ -31,24 +34,24 @@ pub struct TextBlockEditor {
edit_cmd_tx: EditorCommandSender,
}
impl TextBlockEditor {
impl DocumentEditor {
#[allow(unused_variables)]
pub(crate) async fn new(
doc_id: &str,
user: Arc<dyn TextEditorUser>,
user: Arc<dyn DocumentUser>,
mut rev_manager: RevisionManager,
rev_web_socket: Arc<dyn RevisionWebSocket>,
cloud_service: Arc<dyn RevisionCloudService>,
) -> FlowyResult<Arc<Self>> {
let document_info = rev_manager.load::<TextBlockInfoBuilder>(Some(cloud_service)).await?;
let delta = document_info.delta()?;
let document_info = rev_manager.load::<DocumentRevisionSerde>(Some(cloud_service)).await?;
let operations = TextOperations::from_bytes(&document_info.content)?;
let rev_manager = Arc::new(rev_manager);
let doc_id = doc_id.to_string();
let user_id = user.user_id()?;
let edit_cmd_tx = spawn_edit_queue(user, rev_manager.clone(), delta);
let edit_cmd_tx = spawn_edit_queue(user, rev_manager.clone(), operations);
#[cfg(feature = "sync")]
let ws_manager = crate::web_socket::make_block_ws_manager(
let ws_manager = crate::web_socket::make_document_ws_manager(
doc_id.clone(),
user_id.clone(),
edit_cmd_tx.clone(),
@ -140,22 +143,19 @@ impl TextBlockEditor {
Ok(())
}
pub async fn delta_str(&self) -> FlowyResult<String> {
pub async fn get_operation_str(&self) -> FlowyResult<String> {
let (ret, rx) = oneshot::channel::<CollaborateResult<String>>();
let msg = EditorCommand::ReadDeltaStr { ret };
let msg = EditorCommand::StringifyOperations { ret };
let _ = self.edit_cmd_tx.send(msg).await;
let json = rx.await.map_err(internal_error)??;
Ok(json)
}
#[tracing::instrument(level = "trace", skip(self, data), err)]
pub(crate) async fn compose_local_delta(&self, data: Bytes) -> Result<(), FlowyError> {
let delta = TextOperations::from_bytes(&data)?;
pub(crate) async fn compose_local_operations(&self, data: Bytes) -> Result<(), FlowyError> {
let operations = TextOperations::from_bytes(&data)?;
let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
let msg = EditorCommand::ComposeLocalDelta {
delta: delta.clone(),
ret,
};
let msg = EditorCommand::ComposeLocalOperations { operations, ret };
let _ = self.edit_cmd_tx.send(msg).await;
let _ = rx.await.map_err(internal_error)??;
Ok(())
@ -186,20 +186,20 @@ impl TextBlockEditor {
pub(crate) fn receive_ws_state(&self, _state: &WSConnectState) {}
}
impl std::ops::Drop for TextBlockEditor {
impl std::ops::Drop for DocumentEditor {
fn drop(&mut self) {
tracing::trace!("{} ClientBlockEditor was dropped", self.doc_id)
tracing::trace!("{} DocumentEditor was dropped", self.doc_id)
}
}
// The edit queue will exit after the EditorCommandSender was dropped.
fn spawn_edit_queue(
user: Arc<dyn TextEditorUser>,
user: Arc<dyn DocumentUser>,
rev_manager: Arc<RevisionManager>,
delta: TextOperations,
) -> EditorCommandSender {
let (sender, receiver) = mpsc::channel(1000);
let edit_queue = EditBlockQueue::new(user, rev_manager, delta, receiver);
let edit_queue = EditDocumentQueue::new(user, rev_manager, delta, receiver);
// We can use tokio::task::spawn_local here by using tokio::spawn_blocking.
// https://github.com/tokio-rs/tokio/issues/2095
// tokio::task::spawn_blocking(move || {
@ -214,10 +214,10 @@ fn spawn_edit_queue(
}
#[cfg(feature = "flowy_unit_test")]
impl TextBlockEditor {
pub async fn text_block_delta(&self) -> FlowyResult<TextOperations> {
impl DocumentEditor {
pub async fn document_operations(&self) -> FlowyResult<TextOperations> {
let (ret, rx) = oneshot::channel::<CollaborateResult<TextOperations>>();
let msg = EditorCommand::ReadDelta { ret };
let msg = EditorCommand::ReadOperations { ret };
let _ = self.edit_cmd_tx.send(msg).await;
let delta = rx.await.map_err(internal_error)??;
Ok(delta)
@ -228,24 +228,38 @@ impl TextBlockEditor {
}
}
struct TextBlockInfoBuilder();
impl RevisionObjectBuilder for TextBlockInfoBuilder {
type Output = DocumentPB;
pub struct DocumentRevisionSerde();
impl RevisionObjectDeserializer for DocumentRevisionSerde {
type Output = DocumentPayloadPB;
fn build_object(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
fn deserialize_revisions(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
let (base_rev_id, rev_id) = revisions.last().unwrap().pair_rev_id();
let mut delta = make_operations_from_revisions(revisions)?;
correct_delta(&mut delta);
Result::<DocumentPB, FlowyError>::Ok(DocumentPB {
block_id: object_id.to_owned(),
text: delta.json_str(),
Result::<DocumentPayloadPB, FlowyError>::Ok(DocumentPayloadPB {
doc_id: object_id.to_owned(),
content: delta.json_str(),
rev_id,
base_rev_id,
})
}
}
impl RevisionObjectSerializer for DocumentRevisionSerde {
fn serialize_revisions(revisions: Vec<Revision>) -> FlowyResult<Bytes> {
let operations = make_operations_from_revisions::<AttributeHashMap>(revisions)?;
Ok(operations.json_bytes())
}
}
pub(crate) struct DocumentRevisionCompactor();
impl RevisionCompress for DocumentRevisionCompactor {
fn serialize_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
DocumentRevisionSerde::serialize_revisions(revisions)
}
}
// quill-editor requires the delta should end with '\n' and only contains the
// insert operation. The function, correct_delta maybe be removed in the future.
fn correct_delta(delta: &mut TextOperations) {

View File

@ -32,7 +32,7 @@ impl std::convert::From<i32> for ExportType {
#[derive(Default, ProtoBuf)]
pub struct EditPayloadPB {
#[pb(index = 1)]
pub text_block_id: String,
pub doc_id: String,
// Encode in JSON format
#[pb(index = 2)]
@ -40,35 +40,35 @@ pub struct EditPayloadPB {
// Encode in JSON format
#[pb(index = 3)]
pub delta: String,
pub operations_str: String,
}
#[derive(Default)]
pub struct EditParams {
pub text_block_id: String,
pub doc_id: String,
// Encode in JSON format
pub operations: String,
// Encode in JSON format
pub delta: String,
pub operations_str: String,
}
impl TryInto<EditParams> for EditPayloadPB {
type Error = ErrorCode;
fn try_into(self) -> Result<EditParams, Self::Error> {
Ok(EditParams {
text_block_id: self.text_block_id,
doc_id: self.doc_id,
operations: self.operations,
delta: self.delta,
operations_str: self.operations_str,
})
}
}
#[derive(Default, ProtoBuf)]
pub struct TextBlockPB {
pub struct DocumentSnapshotPB {
#[pb(index = 1)]
pub text_block_id: String,
pub doc_id: String,
/// Encode in JSON format
#[pb(index = 2)]

View File

@ -0,0 +1,43 @@
use crate::entities::{DocumentSnapshotPB, EditParams, EditPayloadPB, ExportDataPB, ExportParams, ExportPayloadPB};
use crate::DocumentManager;
use flowy_error::FlowyError;
use flowy_sync::entities::document::DocumentIdPB;
use lib_dispatch::prelude::{data_result, AppData, Data, DataResult};
use std::convert::TryInto;
use std::sync::Arc;
pub(crate) async fn get_document_handler(
data: Data<DocumentIdPB>,
manager: AppData<Arc<DocumentManager>>,
) -> DataResult<DocumentSnapshotPB, FlowyError> {
let document_id: DocumentIdPB = data.into_inner();
let editor = manager.open_document_editor(&document_id).await?;
let operations_str = editor.get_operation_str().await?;
data_result(DocumentSnapshotPB {
doc_id: document_id.into(),
snapshot: operations_str,
})
}
pub(crate) async fn apply_edit_handler(
data: Data<EditPayloadPB>,
manager: AppData<Arc<DocumentManager>>,
) -> Result<(), FlowyError> {
let params: EditParams = data.into_inner().try_into()?;
let _ = manager.apply_edit(params).await?;
Ok(())
}
#[tracing::instrument(level = "debug", skip(data, manager), err)]
pub(crate) async fn export_handler(
data: Data<ExportPayloadPB>,
manager: AppData<Arc<DocumentManager>>,
) -> DataResult<ExportDataPB, FlowyError> {
let params: ExportParams = data.into_inner().try_into()?;
let editor = manager.open_document_editor(&params.view_id).await?;
let operations_str = editor.get_operation_str().await?;
data_result(ExportDataPB {
data: operations_str,
export_type: params.export_type,
})
}

View File

@ -1,26 +1,26 @@
use crate::event_handler::*;
use crate::TextEditorManager;
use crate::DocumentManager;
use flowy_derive::{Flowy_Event, ProtoBuf_Enum};
use lib_dispatch::prelude::Module;
use std::sync::Arc;
use strum_macros::Display;
pub fn create(block_manager: Arc<TextEditorManager>) -> Module {
let mut module = Module::new().name(env!("CARGO_PKG_NAME")).data(block_manager);
pub fn create(document_manager: Arc<DocumentManager>) -> Module {
let mut module = Module::new().name(env!("CARGO_PKG_NAME")).data(document_manager);
module = module
.event(TextBlockEvent::GetTextBlock, get_text_block_handler)
.event(TextBlockEvent::ApplyEdit, apply_edit_handler)
.event(TextBlockEvent::ExportDocument, export_handler);
.event(DocumentEvent::GetDocument, get_document_handler)
.event(DocumentEvent::ApplyEdit, apply_edit_handler)
.event(DocumentEvent::ExportDocument, export_handler);
module
}
#[derive(Clone, Copy, PartialEq, Eq, Debug, Display, Hash, ProtoBuf_Enum, Flowy_Event)]
#[event_err = "FlowyError"]
pub enum TextBlockEvent {
#[event(input = "TextBlockIdPB", output = "TextBlockPB")]
GetTextBlock = 0,
pub enum DocumentEvent {
#[event(input = "DocumentIdPB", output = "DocumentSnapshotPB")]
GetDocument = 0,
#[event(input = "EditPayloadPB")]
ApplyEdit = 1,

View File

@ -0,0 +1,27 @@
pub mod editor;
mod entities;
mod event_handler;
pub mod event_map;
pub mod manager;
mod queue;
mod web_socket;
pub mod protobuf;
pub use manager::*;
pub mod errors {
pub use flowy_error::{internal_error, ErrorCode, FlowyError};
}
pub const TEXT_BLOCK_SYNC_INTERVAL_IN_MILLIS: u64 = 1000;
use crate::errors::FlowyError;
use flowy_sync::entities::document::{CreateDocumentParams, DocumentIdPB, DocumentPayloadPB, ResetDocumentParams};
use lib_infra::future::FutureResult;
pub trait DocumentCloudService: Send + Sync {
fn create_document(&self, token: &str, params: CreateDocumentParams) -> FutureResult<(), FlowyError>;
fn fetch_document(&self, token: &str, params: DocumentIdPB) -> FutureResult<Option<DocumentPayloadPB>, FlowyError>;
fn update_document_content(&self, token: &str, params: ResetDocumentParams) -> FutureResult<(), FlowyError>;
}

View File

@ -1,47 +1,47 @@
use crate::editor::DocumentRevisionCompactor;
use crate::entities::EditParams;
use crate::queue::TextBlockRevisionCompactor;
use crate::{editor::TextBlockEditor, errors::FlowyError, TextEditorCloudService};
use crate::{editor::DocumentEditor, errors::FlowyError, DocumentCloudService};
use bytes::Bytes;
use dashmap::DashMap;
use flowy_database::ConnectionPool;
use flowy_error::FlowyResult;
use flowy_revision::disk::SQLiteTextBlockRevisionPersistence;
use flowy_revision::disk::SQLiteDocumentRevisionPersistence;
use flowy_revision::{
RevisionCloudService, RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionSnapshotPersistence,
};
use flowy_sync::entities::{
document::{DocumentIdPB, DocumentOperationsPB},
revision::{md5, RepeatedRevision, Revision},
text_block::{TextBlockDeltaPB, TextBlockIdPB},
ws_data::ServerRevisionWSData,
};
use lib_infra::future::FutureResult;
use std::{convert::TryInto, sync::Arc};
pub trait TextEditorUser: Send + Sync {
pub trait DocumentUser: Send + Sync {
fn user_dir(&self) -> Result<String, FlowyError>;
fn user_id(&self) -> Result<String, FlowyError>;
fn token(&self) -> Result<String, FlowyError>;
fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError>;
}
pub struct TextEditorManager {
cloud_service: Arc<dyn TextEditorCloudService>,
pub struct DocumentManager {
cloud_service: Arc<dyn DocumentCloudService>,
rev_web_socket: Arc<dyn RevisionWebSocket>,
editor_map: Arc<TextEditorMap>,
user: Arc<dyn TextEditorUser>,
editor_map: Arc<DocumentEditorMap>,
user: Arc<dyn DocumentUser>,
}
impl TextEditorManager {
impl DocumentManager {
pub fn new(
cloud_service: Arc<dyn TextEditorCloudService>,
text_block_user: Arc<dyn TextEditorUser>,
cloud_service: Arc<dyn DocumentCloudService>,
document_user: Arc<dyn DocumentUser>,
rev_web_socket: Arc<dyn RevisionWebSocket>,
) -> Self {
Self {
cloud_service,
rev_web_socket,
editor_map: Arc::new(TextEditorMap::new()),
user: text_block_user,
editor_map: Arc::new(DocumentEditorMap::new()),
user: document_user,
}
}
@ -52,46 +52,49 @@ impl TextEditorManager {
}
#[tracing::instrument(level = "trace", skip(self, editor_id), fields(editor_id), err)]
pub async fn open_text_editor<T: AsRef<str>>(&self, editor_id: T) -> Result<Arc<TextBlockEditor>, FlowyError> {
pub async fn open_document_editor<T: AsRef<str>>(&self, editor_id: T) -> Result<Arc<DocumentEditor>, FlowyError> {
let editor_id = editor_id.as_ref();
tracing::Span::current().record("editor_id", &editor_id);
self.get_text_editor(editor_id).await
self.get_document_editor(editor_id).await
}
#[tracing::instrument(level = "trace", skip(self, editor_id), fields(editor_id), err)]
pub fn close_text_editor<T: AsRef<str>>(&self, editor_id: T) -> Result<(), FlowyError> {
pub fn close_document_editor<T: AsRef<str>>(&self, editor_id: T) -> Result<(), FlowyError> {
let editor_id = editor_id.as_ref();
tracing::Span::current().record("editor_id", &editor_id);
self.editor_map.remove(editor_id);
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, delta), err)]
pub async fn receive_local_delta(&self, delta: TextBlockDeltaPB) -> Result<TextBlockDeltaPB, FlowyError> {
let editor = self.get_text_editor(&delta.text_block_id).await?;
let _ = editor.compose_local_delta(Bytes::from(delta.delta_str)).await?;
let delta_str = editor.delta_str().await?;
Ok(TextBlockDeltaPB {
text_block_id: delta.text_block_id.clone(),
delta_str,
#[tracing::instrument(level = "debug", skip(self, payload), err)]
pub async fn receive_local_operations(
&self,
payload: DocumentOperationsPB,
) -> Result<DocumentOperationsPB, FlowyError> {
let editor = self.get_document_editor(&payload.doc_id).await?;
let _ = editor
.compose_local_operations(Bytes::from(payload.operations_str))
.await?;
let operations_str = editor.get_operation_str().await?;
Ok(DocumentOperationsPB {
doc_id: payload.doc_id.clone(),
operations_str,
})
}
pub async fn apply_edit(&self, params: EditParams) -> FlowyResult<()> {
let editor = self.get_text_editor(&params.text_block_id).await?;
let _ = editor.compose_local_delta(Bytes::from(params.delta)).await?;
let editor = self.get_document_editor(&params.doc_id).await?;
let _ = editor
.compose_local_operations(Bytes::from(params.operations_str))
.await?;
Ok(())
}
pub async fn create_text_block<T: AsRef<str>>(
&self,
text_block_id: T,
revisions: RepeatedRevision,
) -> FlowyResult<()> {
let doc_id = text_block_id.as_ref().to_owned();
pub async fn create_document<T: AsRef<str>>(&self, doc_id: T, revisions: RepeatedRevision) -> FlowyResult<()> {
let doc_id = doc_id.as_ref().to_owned();
let db_pool = self.user.db_pool()?;
// Maybe we could save the block to disk without creating the RevisionManager
let rev_manager = self.make_text_block_rev_manager(&doc_id, db_pool)?;
// Maybe we could save the document to disk without creating the RevisionManager
let rev_manager = self.make_document_rev_manager(&doc_id, db_pool)?;
let _ = rev_manager.reset_object(revisions).await?;
Ok(())
}
@ -113,47 +116,65 @@ impl TextEditorManager {
}
}
impl TextEditorManager {
async fn get_text_editor(&self, block_id: &str) -> FlowyResult<Arc<TextBlockEditor>> {
match self.editor_map.get(block_id) {
impl DocumentManager {
/// Returns the `DocumentEditor`
/// Initializes the document editor if it's not initialized yet. Otherwise, returns the opened
/// editor.
///
/// # Arguments
///
/// * `doc_id`: the id of the document
///
/// returns: Result<Arc<DocumentEditor>, FlowyError>
///
async fn get_document_editor(&self, doc_id: &str) -> FlowyResult<Arc<DocumentEditor>> {
match self.editor_map.get(doc_id) {
None => {
let db_pool = self.user.db_pool()?;
self.make_text_editor(block_id, db_pool).await
self.init_document_editor(doc_id, db_pool).await
}
Some(editor) => Ok(editor),
}
}
/// Initializes a document editor with the doc_id
///
/// # Arguments
///
/// * `doc_id`: the id of the document
/// * `pool`: sqlite connection pool
///
/// returns: Result<Arc<DocumentEditor>, FlowyError>
///
#[tracing::instrument(level = "trace", skip(self, pool), err)]
async fn make_text_editor(
async fn init_document_editor(
&self,
block_id: &str,
doc_id: &str,
pool: Arc<ConnectionPool>,
) -> Result<Arc<TextBlockEditor>, FlowyError> {
) -> Result<Arc<DocumentEditor>, FlowyError> {
let user = self.user.clone();
let token = self.user.token()?;
let rev_manager = self.make_text_block_rev_manager(block_id, pool.clone())?;
let cloud_service = Arc::new(TextBlockRevisionCloudService {
let rev_manager = self.make_document_rev_manager(doc_id, pool.clone())?;
let cloud_service = Arc::new(DocumentRevisionCloudService {
token,
server: self.cloud_service.clone(),
});
let doc_editor =
TextBlockEditor::new(block_id, user, rev_manager, self.rev_web_socket.clone(), cloud_service).await?;
self.editor_map.insert(block_id, &doc_editor);
Ok(doc_editor)
let editor = DocumentEditor::new(doc_id, user, rev_manager, self.rev_web_socket.clone(), cloud_service).await?;
self.editor_map.insert(doc_id, &editor);
Ok(editor)
}
fn make_text_block_rev_manager(
fn make_document_rev_manager(
&self,
doc_id: &str,
pool: Arc<ConnectionPool>,
) -> Result<RevisionManager, FlowyError> {
let user_id = self.user.user_id()?;
let disk_cache = SQLiteTextBlockRevisionPersistence::new(&user_id, pool.clone());
let disk_cache = SQLiteDocumentRevisionPersistence::new(&user_id, pool.clone());
let rev_persistence = RevisionPersistence::new(&user_id, doc_id, disk_cache);
// let history_persistence = SQLiteRevisionHistoryPersistence::new(doc_id, pool.clone());
let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(doc_id, pool);
let rev_compactor = TextBlockRevisionCompactor();
let rev_compactor = DocumentRevisionCompactor();
Ok(RevisionManager::new(
&user_id,
@ -166,30 +187,30 @@ impl TextEditorManager {
}
}
struct TextBlockRevisionCloudService {
struct DocumentRevisionCloudService {
token: String,
server: Arc<dyn TextEditorCloudService>,
server: Arc<dyn DocumentCloudService>,
}
impl RevisionCloudService for TextBlockRevisionCloudService {
impl RevisionCloudService for DocumentRevisionCloudService {
#[tracing::instrument(level = "trace", skip(self))]
fn fetch_object(&self, user_id: &str, object_id: &str) -> FutureResult<Vec<Revision>, FlowyError> {
let params: TextBlockIdPB = object_id.to_string().into();
let params: DocumentIdPB = object_id.to_string().into();
let server = self.server.clone();
let token = self.token.clone();
let user_id = user_id.to_string();
FutureResult::new(async move {
match server.read_text_block(&token, params).await? {
match server.fetch_document(&token, params).await? {
None => Err(FlowyError::record_not_found().context("Remote doesn't have this document")),
Some(doc) => {
let delta_data = Bytes::from(doc.text.clone());
let doc_md5 = md5(&delta_data);
Some(payload) => {
let bytes = Bytes::from(payload.content.clone());
let doc_md5 = md5(&bytes);
let revision = Revision::new(
&doc.block_id,
doc.base_rev_id,
doc.rev_id,
delta_data,
&payload.doc_id,
payload.base_rev_id,
payload.rev_id,
bytes,
&user_id,
doc_md5,
);
@ -200,23 +221,23 @@ impl RevisionCloudService for TextBlockRevisionCloudService {
}
}
pub struct TextEditorMap {
inner: DashMap<String, Arc<TextBlockEditor>>,
pub struct DocumentEditorMap {
inner: DashMap<String, Arc<DocumentEditor>>,
}
impl TextEditorMap {
impl DocumentEditorMap {
fn new() -> Self {
Self { inner: DashMap::new() }
}
pub(crate) fn insert(&self, editor_id: &str, doc: &Arc<TextBlockEditor>) {
pub(crate) fn insert(&self, editor_id: &str, doc: &Arc<DocumentEditor>) {
if self.inner.contains_key(editor_id) {
log::warn!("Doc:{} already exists in cache", editor_id);
}
self.inner.insert(editor_id.to_string(), doc.clone());
}
pub(crate) fn get(&self, editor_id: &str) -> Option<Arc<TextBlockEditor>> {
pub(crate) fn get(&self, editor_id: &str) -> Option<Arc<DocumentEditor>> {
Some(self.inner.get(editor_id)?.clone())
}
@ -229,7 +250,7 @@ impl TextEditorMap {
}
#[tracing::instrument(level = "trace", skip(web_socket, handlers))]
fn listen_ws_state_changed(web_socket: Arc<dyn RevisionWebSocket>, handlers: Arc<TextEditorMap>) {
fn listen_ws_state_changed(web_socket: Arc<dyn RevisionWebSocket>, handlers: Arc<DocumentEditorMap>) {
tokio::spawn(async move {
let mut notify = web_socket.subscribe_state_changed().await;
while let Ok(state) = notify.recv().await {

View File

@ -1,17 +1,15 @@
use crate::web_socket::EditorCommandReceiver;
use crate::TextEditorUser;
use crate::web_socket::{DocumentResolveOperations, EditorCommandReceiver};
use crate::DocumentUser;
use async_stream::stream;
use bytes::Bytes;
use flowy_error::{FlowyError, FlowyResult};
use flowy_revision::{OperationsMD5, RevisionCompactor, RevisionManager, RichTextTransformDeltas, TransformDeltas};
use flowy_sync::util::make_operations_from_revisions;
use flowy_error::FlowyError;
use flowy_revision::{OperationsMD5, RevisionManager, TransformOperations};
use flowy_sync::{
client_document::{history::UndoResult, ClientDocument},
entities::revision::{RevId, Revision},
errors::CollaborateError,
};
use futures::stream::StreamExt;
use lib_ot::core::{AttributeEntry, AttributeHashMap};
use lib_ot::core::AttributeEntry;
use lib_ot::{
core::{Interval, OperationTransform},
text_delta::TextOperations,
@ -21,21 +19,21 @@ use tokio::sync::{oneshot, RwLock};
// The EditorCommandQueue executes each command that will alter the document in
// serial.
pub(crate) struct EditBlockQueue {
pub(crate) struct EditDocumentQueue {
document: Arc<RwLock<ClientDocument>>,
user: Arc<dyn TextEditorUser>,
user: Arc<dyn DocumentUser>,
rev_manager: Arc<RevisionManager>,
receiver: Option<EditorCommandReceiver>,
}
impl EditBlockQueue {
impl EditDocumentQueue {
pub(crate) fn new(
user: Arc<dyn TextEditorUser>,
user: Arc<dyn DocumentUser>,
rev_manager: Arc<RevisionManager>,
delta: TextOperations,
operations: TextOperations,
receiver: EditorCommandReceiver,
) -> Self {
let document = Arc::new(RwLock::new(ClientDocument::from_operations(delta)));
let document = Arc::new(RwLock::new(ClientDocument::from_operations(operations)));
Self {
document,
user,
@ -67,62 +65,62 @@ impl EditBlockQueue {
#[tracing::instrument(level = "trace", skip(self), err)]
async fn handle_command(&self, command: EditorCommand) -> Result<(), FlowyError> {
match command {
EditorCommand::ComposeLocalDelta { delta, ret } => {
EditorCommand::ComposeLocalOperations { operations, ret } => {
let mut document = self.document.write().await;
let _ = document.compose_operations(delta.clone())?;
let _ = document.compose_operations(operations.clone())?;
let md5 = document.md5();
drop(document);
let _ = self.save_local_delta(delta, md5).await?;
let _ = self.save_local_operations(operations, md5).await?;
let _ = ret.send(Ok(()));
}
EditorCommand::ComposeRemoteDelta { client_delta, ret } => {
EditorCommand::ComposeRemoteOperation { client_operations, ret } => {
let mut document = self.document.write().await;
let _ = document.compose_operations(client_delta.clone())?;
let _ = document.compose_operations(client_operations.clone())?;
let md5 = document.md5();
drop(document);
let _ = ret.send(Ok(md5));
}
EditorCommand::ResetDelta { delta, ret } => {
EditorCommand::ResetOperations { operations, ret } => {
let mut document = self.document.write().await;
let _ = document.set_operations(delta);
let _ = document.set_operations(operations);
let md5 = document.md5();
drop(document);
let _ = ret.send(Ok(md5));
}
EditorCommand::TransformDelta { delta, ret } => {
EditorCommand::TransformOperations { operations, ret } => {
let f = || async {
let read_guard = self.document.read().await;
let mut server_prime: Option<TextOperations> = None;
let client_prime: TextOperations;
let mut server_operations: Option<DocumentResolveOperations> = None;
let client_operations: TextOperations;
if read_guard.is_empty() {
// Do nothing
client_prime = delta;
client_operations = operations;
} else {
let (s_prime, c_prime) = read_guard.get_operations().transform(&delta)?;
client_prime = c_prime;
server_prime = Some(s_prime);
let (s_prime, c_prime) = read_guard.get_operations().transform(&operations)?;
client_operations = c_prime;
server_operations = Some(DocumentResolveOperations(s_prime));
}
drop(read_guard);
Ok::<RichTextTransformDeltas, CollaborateError>(TransformDeltas {
client_prime,
server_prime,
Ok::<TextTransformOperations, CollaborateError>(TransformOperations {
client_operations: DocumentResolveOperations(client_operations),
server_operations,
})
};
let _ = ret.send(f().await);
}
EditorCommand::Insert { index, data, ret } => {
let mut write_guard = self.document.write().await;
let delta = write_guard.insert(index, data)?;
let operations = write_guard.insert(index, data)?;
let md5 = write_guard.md5();
let _ = self.save_local_delta(delta, md5).await?;
let _ = self.save_local_operations(operations, md5).await?;
let _ = ret.send(Ok(()));
}
EditorCommand::Delete { interval, ret } => {
let mut write_guard = self.document.write().await;
let delta = write_guard.delete(interval)?;
let operations = write_guard.delete(interval)?;
let md5 = write_guard.md5();
let _ = self.save_local_delta(delta, md5).await?;
let _ = self.save_local_operations(operations, md5).await?;
let _ = ret.send(Ok(()));
}
EditorCommand::Format {
@ -131,16 +129,16 @@ impl EditBlockQueue {
ret,
} => {
let mut write_guard = self.document.write().await;
let delta = write_guard.format(interval, attribute)?;
let operations = write_guard.format(interval, attribute)?;
let md5 = write_guard.md5();
let _ = self.save_local_delta(delta, md5).await?;
let _ = self.save_local_operations(operations, md5).await?;
let _ = ret.send(Ok(()));
}
EditorCommand::Replace { interval, data, ret } => {
let mut write_guard = self.document.write().await;
let delta = write_guard.replace(interval, data)?;
let operations = write_guard.replace(interval, data)?;
let md5 = write_guard.md5();
let _ = self.save_local_delta(delta, md5).await?;
let _ = self.save_local_operations(operations, md5).await?;
let _ = ret.send(Ok(()));
}
EditorCommand::CanUndo { ret } => {
@ -151,73 +149,60 @@ impl EditBlockQueue {
}
EditorCommand::Undo { ret } => {
let mut write_guard = self.document.write().await;
let UndoResult { operations: delta } = write_guard.undo()?;
let UndoResult { operations } = write_guard.undo()?;
let md5 = write_guard.md5();
let _ = self.save_local_delta(delta, md5).await?;
let _ = self.save_local_operations(operations, md5).await?;
let _ = ret.send(Ok(()));
}
EditorCommand::Redo { ret } => {
let mut write_guard = self.document.write().await;
let UndoResult { operations: delta } = write_guard.redo()?;
let UndoResult { operations } = write_guard.redo()?;
let md5 = write_guard.md5();
let _ = self.save_local_delta(delta, md5).await?;
let _ = self.save_local_operations(operations, md5).await?;
let _ = ret.send(Ok(()));
}
EditorCommand::ReadDeltaStr { ret } => {
EditorCommand::StringifyOperations { ret } => {
let data = self.document.read().await.get_operations_json();
let _ = ret.send(Ok(data));
}
EditorCommand::ReadDelta { ret } => {
let delta = self.document.read().await.get_operations().clone();
let _ = ret.send(Ok(delta));
EditorCommand::ReadOperations { ret } => {
let operations = self.document.read().await.get_operations().clone();
let _ = ret.send(Ok(operations));
}
}
Ok(())
}
async fn save_local_delta(&self, delta: TextOperations, md5: String) -> Result<RevId, FlowyError> {
let delta_data = delta.json_bytes();
async fn save_local_operations(&self, operations: TextOperations, md5: String) -> Result<RevId, FlowyError> {
let bytes = operations.json_bytes();
let (base_rev_id, rev_id) = self.rev_manager.next_rev_id_pair();
let user_id = self.user.user_id()?;
let revision = Revision::new(
&self.rev_manager.object_id,
base_rev_id,
rev_id,
delta_data,
&user_id,
md5,
);
let revision = Revision::new(&self.rev_manager.object_id, base_rev_id, rev_id, bytes, &user_id, md5);
let _ = self.rev_manager.add_local_revision(&revision).await?;
Ok(rev_id.into())
}
}
pub(crate) struct TextBlockRevisionCompactor();
impl RevisionCompactor for TextBlockRevisionCompactor {
fn bytes_from_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
let delta = make_operations_from_revisions::<AttributeHashMap>(revisions)?;
Ok(delta.json_bytes())
}
}
pub type TextTransformOperations = TransformOperations<DocumentResolveOperations>;
pub(crate) type Ret<T> = oneshot::Sender<Result<T, CollaborateError>>;
pub(crate) enum EditorCommand {
ComposeLocalDelta {
delta: TextOperations,
ComposeLocalOperations {
operations: TextOperations,
ret: Ret<()>,
},
ComposeRemoteDelta {
client_delta: TextOperations,
ComposeRemoteOperation {
client_operations: TextOperations,
ret: Ret<OperationsMD5>,
},
ResetDelta {
delta: TextOperations,
ResetOperations {
operations: TextOperations,
ret: Ret<OperationsMD5>,
},
TransformDelta {
delta: TextOperations,
ret: Ret<RichTextTransformDeltas>,
TransformOperations {
operations: TextOperations,
ret: Ret<TextTransformOperations>,
},
Insert {
index: usize,
@ -250,11 +235,11 @@ pub(crate) enum EditorCommand {
Redo {
ret: Ret<()>,
},
ReadDeltaStr {
StringifyOperations {
ret: Ret<String>,
},
#[allow(dead_code)]
ReadDelta {
ReadOperations {
ret: Ret<TextOperations>,
},
}
@ -262,10 +247,10 @@ pub(crate) enum EditorCommand {
impl std::fmt::Debug for EditorCommand {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let s = match self {
EditorCommand::ComposeLocalDelta { .. } => "ComposeLocalDelta",
EditorCommand::ComposeRemoteDelta { .. } => "ComposeRemoteDelta",
EditorCommand::ResetDelta { .. } => "ResetDelta",
EditorCommand::TransformDelta { .. } => "TransformDelta",
EditorCommand::ComposeLocalOperations { .. } => "ComposeLocalOperations",
EditorCommand::ComposeRemoteOperation { .. } => "ComposeRemoteOperation",
EditorCommand::ResetOperations { .. } => "ResetOperations",
EditorCommand::TransformOperations { .. } => "TransformOperations",
EditorCommand::Insert { .. } => "Insert",
EditorCommand::Delete { .. } => "Delete",
EditorCommand::Format { .. } => "Format",
@ -274,8 +259,8 @@ impl std::fmt::Debug for EditorCommand {
EditorCommand::CanRedo { .. } => "CanRedo",
EditorCommand::Undo { .. } => "Undo",
EditorCommand::Redo { .. } => "Redo",
EditorCommand::ReadDeltaStr { .. } => "ReadDeltaStr",
EditorCommand::ReadDelta { .. } => "ReadDocumentAsDelta",
EditorCommand::StringifyOperations { .. } => "StringifyOperations",
EditorCommand::ReadOperations { .. } => "ReadOperations",
};
f.write_str(s)
}

View File

@ -1,7 +1,9 @@
use crate::queue::TextTransformOperations;
use crate::{queue::EditorCommand, TEXT_BLOCK_SYNC_INTERVAL_IN_MILLIS};
use bytes::Bytes;
use flowy_error::{internal_error, FlowyError};
use flowy_error::{internal_error, FlowyError, FlowyResult};
use flowy_revision::*;
use flowy_sync::entities::revision::Revision;
use flowy_sync::{
entities::{
revision::RevisionRange,
@ -10,8 +12,8 @@ use flowy_sync::{
errors::CollaborateResult,
};
use lib_infra::future::{BoxResultFuture, FutureResult};
use lib_ot::core::AttributeHashMap;
use flowy_sync::util::make_operations_from_revisions;
use lib_ot::text_delta::TextOperations;
use lib_ws::WSConnectState;
use std::{sync::Arc, time::Duration};
@ -24,8 +26,31 @@ use tokio::sync::{
pub(crate) type EditorCommandSender = Sender<EditorCommand>;
pub(crate) type EditorCommandReceiver = Receiver<EditorCommand>;
#[derive(Clone)]
pub struct DocumentResolveOperations(pub TextOperations);
impl OperationsDeserializer<DocumentResolveOperations> for DocumentResolveOperations {
fn deserialize_revisions(revisions: Vec<Revision>) -> FlowyResult<DocumentResolveOperations> {
Ok(DocumentResolveOperations(make_operations_from_revisions(revisions)?))
}
}
impl OperationsSerializer for DocumentResolveOperations {
fn serialize_operations(&self) -> Bytes {
self.0.json_bytes()
}
}
impl DocumentResolveOperations {
pub fn into_inner(self) -> TextOperations {
self.0
}
}
pub type DocumentConflictController = ConflictController<DocumentResolveOperations>;
#[allow(dead_code)]
pub(crate) async fn make_block_ws_manager(
pub(crate) async fn make_document_ws_manager(
doc_id: String,
user_id: String,
edit_cmd_tx: EditorCommandSender,
@ -33,11 +58,11 @@ pub(crate) async fn make_block_ws_manager(
rev_web_socket: Arc<dyn RevisionWebSocket>,
) -> Arc<RevisionWebSocketManager> {
let ws_data_provider = Arc::new(WSDataProvider::new(&doc_id, Arc::new(rev_manager.clone())));
let resolver = Arc::new(TextBlockConflictResolver { edit_cmd_tx });
let resolver = Arc::new(DocumentConflictResolver { edit_cmd_tx });
let conflict_controller =
RichTextConflictController::new(&user_id, resolver, Arc::new(ws_data_provider.clone()), rev_manager);
let ws_data_stream = Arc::new(TextBlockRevisionWSDataStream::new(conflict_controller));
let ws_data_sink = Arc::new(TextBlockWSDataSink(ws_data_provider));
DocumentConflictController::new(&user_id, resolver, Arc::new(ws_data_provider.clone()), rev_manager);
let ws_data_stream = Arc::new(DocumentRevisionWSDataStream::new(conflict_controller));
let ws_data_sink = Arc::new(DocumentWSDataSink(ws_data_provider));
let ping_duration = Duration::from_millis(TEXT_BLOCK_SYNC_INTERVAL_IN_MILLIS);
let ws_manager = Arc::new(RevisionWebSocketManager::new(
"Block",
@ -65,20 +90,20 @@ fn listen_document_ws_state(_user_id: &str, _doc_id: &str, mut subscriber: broad
});
}
pub(crate) struct TextBlockRevisionWSDataStream {
conflict_controller: Arc<RichTextConflictController>,
pub(crate) struct DocumentRevisionWSDataStream {
conflict_controller: Arc<DocumentConflictController>,
}
impl TextBlockRevisionWSDataStream {
impl DocumentRevisionWSDataStream {
#[allow(dead_code)]
pub fn new(conflict_controller: RichTextConflictController) -> Self {
pub fn new(conflict_controller: DocumentConflictController) -> Self {
Self {
conflict_controller: Arc::new(conflict_controller),
}
}
}
impl RevisionWSDataStream for TextBlockRevisionWSDataStream {
impl RevisionWSDataStream for DocumentRevisionWSDataStream {
fn receive_push_revision(&self, bytes: Bytes) -> BoxResultFuture<(), FlowyError> {
let resolver = self.conflict_controller.clone();
Box::pin(async move { resolver.receive_bytes(bytes).await })
@ -100,64 +125,67 @@ impl RevisionWSDataStream for TextBlockRevisionWSDataStream {
}
}
pub(crate) struct TextBlockWSDataSink(pub(crate) Arc<WSDataProvider>);
impl RevisionWebSocketSink for TextBlockWSDataSink {
pub(crate) struct DocumentWSDataSink(pub(crate) Arc<WSDataProvider>);
impl RevisionWebSocketSink for DocumentWSDataSink {
fn next(&self) -> FutureResult<Option<ClientRevisionWSData>, FlowyError> {
let sink_provider = self.0.clone();
FutureResult::new(async move { sink_provider.next().await })
}
}
struct TextBlockConflictResolver {
struct DocumentConflictResolver {
edit_cmd_tx: EditorCommandSender,
}
impl ConflictResolver<AttributeHashMap> for TextBlockConflictResolver {
fn compose_delta(&self, delta: TextOperations) -> BoxResultFuture<OperationsMD5, FlowyError> {
impl ConflictResolver<DocumentResolveOperations> for DocumentConflictResolver {
fn compose_operations(&self, operations: DocumentResolveOperations) -> BoxResultFuture<OperationsMD5, FlowyError> {
let tx = self.edit_cmd_tx.clone();
let operations = operations.into_inner();
Box::pin(async move {
let (ret, rx) = oneshot::channel();
tx.send(EditorCommand::ComposeRemoteDelta {
client_delta: delta,
tx.send(EditorCommand::ComposeRemoteOperation {
client_operations: operations,
ret,
})
.await
.map_err(internal_error)?;
let md5 = rx.await.map_err(|e| {
FlowyError::internal().context(format!("handle EditorCommand::ComposeRemoteDelta failed: {}", e))
})??;
let md5 = rx
.await
.map_err(|e| FlowyError::internal().context(format!("Compose operations failed: {}", e)))??;
Ok(md5)
})
}
fn transform_delta(
fn transform_operations(
&self,
delta: TextOperations,
) -> BoxResultFuture<flowy_revision::RichTextTransformDeltas, FlowyError> {
operations: DocumentResolveOperations,
) -> BoxResultFuture<TransformOperations<DocumentResolveOperations>, FlowyError> {
let tx = self.edit_cmd_tx.clone();
let operations = operations.into_inner();
Box::pin(async move {
let (ret, rx) = oneshot::channel::<CollaborateResult<RichTextTransformDeltas>>();
tx.send(EditorCommand::TransformDelta { delta, ret })
let (ret, rx) = oneshot::channel::<CollaborateResult<TextTransformOperations>>();
tx.send(EditorCommand::TransformOperations { operations, ret })
.await
.map_err(internal_error)?;
let transform_delta = rx
let transformed_operations = rx
.await
.map_err(|e| FlowyError::internal().context(format!("TransformDelta failed: {}", e)))??;
Ok(transform_delta)
.map_err(|e| FlowyError::internal().context(format!("Transform operations failed: {}", e)))??;
Ok(transformed_operations)
})
}
fn reset_delta(&self, delta: TextOperations) -> BoxResultFuture<OperationsMD5, FlowyError> {
fn reset_operations(&self, operations: DocumentResolveOperations) -> BoxResultFuture<OperationsMD5, FlowyError> {
let tx = self.edit_cmd_tx.clone();
let operations = operations.into_inner();
Box::pin(async move {
let (ret, rx) = oneshot::channel();
let _ = tx
.send(EditorCommand::ResetDelta { delta, ret })
.send(EditorCommand::ResetOperations { operations, ret })
.await
.map_err(internal_error)?;
let md5 = rx.await.map_err(|e| {
FlowyError::internal().context(format!("handle EditorCommand::OverrideDelta failed: {}", e))
})??;
let md5 = rx
.await
.map_err(|e| FlowyError::internal().context(format!("Reset operations failed: {}", e)))??;
Ok(md5)
})
}

View File

@ -1,7 +1,7 @@
use flowy_document::editor::DocumentEditor;
use flowy_document::TEXT_BLOCK_SYNC_INTERVAL_IN_MILLIS;
use flowy_revision::disk::RevisionState;
use flowy_test::{helper::ViewTest, FlowySDKTest};
use flowy_text_block::editor::TextBlockEditor;
use flowy_text_block::TEXT_BLOCK_SYNC_INTERVAL_IN_MILLIS;
use lib_ot::{core::Interval, text_delta::TextOperations};
use std::sync::Arc;
use tokio::time::{sleep, Duration};
@ -17,17 +17,21 @@ pub enum EditorScript {
AssertJson(&'static str),
}
pub struct TextBlockEditorTest {
pub struct DocumentEditorTest {
pub sdk: FlowySDKTest,
pub editor: Arc<TextBlockEditor>,
pub editor: Arc<DocumentEditor>,
}
impl TextBlockEditorTest {
impl DocumentEditorTest {
pub async fn new() -> Self {
let sdk = FlowySDKTest::default();
let _ = sdk.init_user().await;
let test = ViewTest::new_text_block_view(&sdk).await;
let editor = sdk.text_block_manager.open_text_editor(&test.view.id).await.unwrap();
let editor = sdk
.text_block_manager
.open_document_editor(&test.view.id)
.await
.unwrap();
Self { sdk, editor }
}
@ -72,7 +76,7 @@ impl TextBlockEditorTest {
}
EditorScript::AssertJson(expected) => {
let expected_delta: TextOperations = serde_json::from_str(expected).unwrap();
let delta = self.editor.text_block_delta().await.unwrap();
let delta = self.editor.document_operations().await.unwrap();
if expected_delta != delta {
eprintln!("✅ expect: {}", expected,);
eprintln!("❌ receive: {}", delta.json_str());

View File

@ -14,7 +14,7 @@ async fn text_block_sync_current_rev_id_check() {
AssertNextSyncRevId(None),
AssertJson(r#"[{"insert":"123\n"}]"#),
];
TextBlockEditorTest::new().await.run_scripts(scripts).await;
DocumentEditorTest::new().await.run_scripts(scripts).await;
}
#[tokio::test]
@ -28,7 +28,7 @@ async fn text_block_sync_state_check() {
AssertRevisionState(3, RevisionState::Ack),
AssertJson(r#"[{"insert":"123\n"}]"#),
];
TextBlockEditorTest::new().await.run_scripts(scripts).await;
DocumentEditorTest::new().await.run_scripts(scripts).await;
}
#[tokio::test]
@ -40,7 +40,7 @@ async fn text_block_sync_insert_test() {
AssertJson(r#"[{"insert":"123\n"}]"#),
AssertNextSyncRevId(None),
];
TextBlockEditorTest::new().await.run_scripts(scripts).await;
DocumentEditorTest::new().await.run_scripts(scripts).await;
}
#[tokio::test]
@ -52,7 +52,7 @@ async fn text_block_sync_insert_in_chinese() {
InsertText("", offset),
AssertJson(r#"[{"insert":"你好\n"}]"#),
];
TextBlockEditorTest::new().await.run_scripts(scripts).await;
DocumentEditorTest::new().await.run_scripts(scripts).await;
}
#[tokio::test]
@ -64,7 +64,7 @@ async fn text_block_sync_insert_with_emoji() {
InsertText("☺️", offset),
AssertJson(r#"[{"insert":"😁☺️\n"}]"#),
];
TextBlockEditorTest::new().await.run_scripts(scripts).await;
DocumentEditorTest::new().await.run_scripts(scripts).await;
}
#[tokio::test]
@ -76,7 +76,7 @@ async fn text_block_sync_delete_in_english() {
Delete(Interval::new(0, 2)),
AssertJson(r#"[{"insert":"3\n"}]"#),
];
TextBlockEditorTest::new().await.run_scripts(scripts).await;
DocumentEditorTest::new().await.run_scripts(scripts).await;
}
#[tokio::test]
@ -89,7 +89,7 @@ async fn text_block_sync_delete_in_chinese() {
Delete(Interval::new(0, offset)),
AssertJson(r#"[{"insert":"好\n"}]"#),
];
TextBlockEditorTest::new().await.run_scripts(scripts).await;
DocumentEditorTest::new().await.run_scripts(scripts).await;
}
#[tokio::test]
@ -101,5 +101,5 @@ async fn text_block_sync_replace_test() {
Replace(Interval::new(0, 3), "abc"),
AssertJson(r#"[{"insert":"abc\n"}]"#),
];
TextBlockEditorTest::new().await.run_scripts(scripts).await;
DocumentEditorTest::new().await.run_scripts(scripts).await;
}

View File

@ -5,7 +5,7 @@ mod serde_test;
mod undo_redo_test;
use derive_more::Display;
use flowy_sync::client_document::{ClientDocument, InitialDocumentContent};
use flowy_sync::client_document::{ClientDocument, InitialDocument};
use lib_ot::{
core::*,
text_delta::{BuildInTextAttribute, TextOperations},
@ -264,7 +264,7 @@ impl TestBuilder {
}
}
pub fn run_scripts<C: InitialDocumentContent>(mut self, scripts: Vec<TestOp>) {
pub fn run_scripts<C: InitialDocument>(mut self, scripts: Vec<TestOp>) {
self.documents = vec![ClientDocument::new::<C>(), ClientDocument::new::<C>()];
self.primes = vec![None, None];
self.deltas = vec![None, None];

View File

@ -12,7 +12,7 @@ flowy-derive = { path = "../../../shared-lib/flowy-derive" }
lib-ot = { path = "../../../shared-lib/lib-ot" }
lib-infra = { path = "../../../shared-lib/lib-infra" }
flowy-text-block = { path = "../flowy-text-block" }
flowy-document = { path = "../flowy-document" }
flowy-database = { path = "../flowy-database" }
flowy-error = { path = "../flowy-error", features = ["db", "http_server"]}
dart-notify = { path = "../dart-notify" }

View File

@ -14,7 +14,7 @@ use crate::{
use bytes::Bytes;
use flowy_error::FlowyError;
use flowy_folder_data_model::user_default;
use flowy_revision::disk::SQLiteTextBlockRevisionPersistence;
use flowy_revision::disk::SQLiteDocumentRevisionPersistence;
use flowy_revision::{RevisionManager, RevisionPersistence, RevisionWebSocket, SQLiteRevisionSnapshotPersistence};
use flowy_sync::client_document::default::{initial_document_str, initial_read_me};
use flowy_sync::{client_folder::FolderPad, entities::ws_data::ServerRevisionWSData};
@ -164,7 +164,7 @@ impl FolderManager {
let pool = self.persistence.db_pool()?;
let object_id = folder_id.as_ref();
let disk_cache = SQLiteTextBlockRevisionPersistence::new(user_id, pool.clone());
let disk_cache = SQLiteDocumentRevisionPersistence::new(user_id, pool.clone());
let rev_persistence = RevisionPersistence::new(user_id, object_id, disk_cache);
let rev_compactor = FolderRevisionCompactor();
// let history_persistence = SQLiteRevisionHistoryPersistence::new(object_id, pool.clone());

View File

@ -2,7 +2,8 @@ use crate::manager::FolderId;
use bytes::Bytes;
use flowy_error::{FlowyError, FlowyResult};
use flowy_revision::{
RevisionCloudService, RevisionCompactor, RevisionManager, RevisionObjectBuilder, RevisionWebSocket,
RevisionCloudService, RevisionCompress, RevisionManager, RevisionObjectDeserializer, RevisionObjectSerializer,
RevisionWebSocket,
};
use flowy_sync::util::make_operations_from_revisions;
use flowy_sync::{
@ -37,7 +38,7 @@ impl FolderEditor {
let cloud = Arc::new(FolderRevisionCloudService {
token: token.to_string(),
});
let folder = Arc::new(RwLock::new(rev_manager.load::<FolderPadBuilder>(Some(cloud)).await?));
let folder = Arc::new(RwLock::new(rev_manager.load::<FolderRevisionSerde>(Some(cloud)).await?));
let rev_manager = Arc::new(rev_manager);
#[cfg(feature = "sync")]
@ -100,16 +101,30 @@ impl FolderEditor {
}
}
struct FolderPadBuilder();
impl RevisionObjectBuilder for FolderPadBuilder {
struct FolderRevisionSerde();
impl RevisionObjectDeserializer for FolderRevisionSerde {
type Output = FolderPad;
fn build_object(_object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
fn deserialize_revisions(_object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
let pad = FolderPad::from_revisions(revisions)?;
Ok(pad)
}
}
impl RevisionObjectSerializer for FolderRevisionSerde {
fn serialize_revisions(revisions: Vec<Revision>) -> FlowyResult<Bytes> {
let operations = make_operations_from_revisions::<EmptyAttributes>(revisions)?;
Ok(operations.json_bytes())
}
}
pub struct FolderRevisionCompactor();
impl RevisionCompress for FolderRevisionCompactor {
fn serialize_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
FolderRevisionSerde::serialize_revisions(revisions)
}
}
struct FolderRevisionCloudService {
#[allow(dead_code)]
token: String,
@ -128,11 +143,3 @@ impl FolderEditor {
self.rev_manager.clone()
}
}
pub struct FolderRevisionCompactor();
impl RevisionCompactor for FolderRevisionCompactor {
fn bytes_from_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
let operations = make_operations_from_revisions::<EmptyAttributes>(revisions)?;
Ok(operations.json_bytes())
}
}

View File

@ -3,14 +3,16 @@ use crate::{
event_map::WorkspaceDatabase,
services::persistence::{AppTableSql, TrashTableSql, ViewTableSql, WorkspaceTableSql},
};
use bytes::Bytes;
use flowy_database::kv::KV;
use flowy_error::{FlowyError, FlowyResult};
use flowy_folder_data_model::revision::{AppRevision, FolderRevision, ViewRevision, WorkspaceRevision};
use flowy_revision::disk::SQLiteTextBlockRevisionPersistence;
use flowy_revision::disk::SQLiteDocumentRevisionPersistence;
use flowy_revision::reset::{RevisionResettable, RevisionStructReset};
use flowy_sync::client_folder::make_folder_rev_json_str;
use flowy_sync::entities::revision::Revision;
use flowy_sync::{client_folder::FolderPad, entities::revision::md5};
use lib_ot::core::DeltaBuilder;
use std::sync::Arc;
const V1_MIGRATION: &str = "FOLDER_V1_MIGRATION";
@ -110,7 +112,7 @@ impl FolderMigration {
};
let pool = self.database.db_pool()?;
let disk_cache = SQLiteTextBlockRevisionPersistence::new(&self.user_id, pool);
let disk_cache = SQLiteDocumentRevisionPersistence::new(&self.user_id, pool);
let reset = RevisionStructReset::new(&self.user_id, object, Arc::new(disk_cache));
reset.run().await
}
@ -129,10 +131,11 @@ impl RevisionResettable for FolderRevisionResettable {
&self.folder_id
}
fn target_reset_rev_str(&self, revisions: Vec<Revision>) -> FlowyResult<String> {
fn reset_data(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
let pad = FolderPad::from_revisions(revisions)?;
let json = pad.to_json()?;
Ok(json)
let bytes = DeltaBuilder::new().insert(&json).build().json_bytes();
Ok(bytes)
}
fn default_target_rev_str(&self) -> FlowyResult<String> {

View File

@ -17,7 +17,7 @@ use crate::{
use bytes::Bytes;
use flowy_database::kv::KV;
use flowy_folder_data_model::revision::{gen_view_id, ViewRevision};
use flowy_sync::entities::text_block::TextBlockIdPB;
use flowy_sync::entities::document::DocumentIdPB;
use futures::{FutureExt, StreamExt};
use std::{collections::HashSet, sync::Arc};
@ -193,7 +193,7 @@ impl ViewController {
}
#[tracing::instrument(level = "debug", skip(self,params), fields(doc_id = %params.value), err)]
pub(crate) async fn move_view_to_trash(&self, params: TextBlockIdPB) -> Result<(), FlowyError> {
pub(crate) async fn move_view_to_trash(&self, params: DocumentIdPB) -> Result<(), FlowyError> {
let view_id = params.value;
if let Some(latest_view_id) = KV::get_str(LATEST_VIEW_ID) {
if latest_view_id == view_id {

View File

@ -1,7 +1,10 @@
use crate::services::FOLDER_SYNC_INTERVAL_IN_MILLIS;
use bytes::Bytes;
use flowy_error::FlowyError;
use flowy_error::{FlowyError, FlowyResult};
use flowy_revision::*;
use flowy_sync::entities::revision::Revision;
use flowy_sync::server_folder::FolderOperations;
use flowy_sync::util::make_operations_from_revisions;
use flowy_sync::{
client_folder::FolderPad,
entities::{
@ -10,10 +13,32 @@ use flowy_sync::{
},
};
use lib_infra::future::{BoxResultFuture, FutureResult};
use lib_ot::core::{Delta, EmptyAttributes, OperationTransform};
use lib_ot::core::OperationTransform;
use parking_lot::RwLock;
use std::{sync::Arc, time::Duration};
#[derive(Clone)]
pub struct FolderResolveOperations(pub FolderOperations);
impl OperationsDeserializer<FolderResolveOperations> for FolderResolveOperations {
fn deserialize_revisions(revisions: Vec<Revision>) -> FlowyResult<FolderResolveOperations> {
Ok(FolderResolveOperations(make_operations_from_revisions(revisions)?))
}
}
impl OperationsSerializer for FolderResolveOperations {
fn serialize_operations(&self) -> Bytes {
self.0.json_bytes()
}
}
impl FolderResolveOperations {
pub fn into_inner(self) -> FolderOperations {
self.0
}
}
pub type FolderConflictController = ConflictController<FolderResolveOperations>;
#[allow(dead_code)]
pub(crate) async fn make_folder_ws_manager(
user_id: &str,
@ -25,7 +50,7 @@ pub(crate) async fn make_folder_ws_manager(
let ws_data_provider = Arc::new(WSDataProvider::new(folder_id, Arc::new(rev_manager.clone())));
let resolver = Arc::new(FolderConflictResolver { folder_pad });
let conflict_controller =
ConflictController::<EmptyAttributes>::new(user_id, resolver, Arc::new(ws_data_provider.clone()), rev_manager);
FolderConflictController::new(user_id, resolver, Arc::new(ws_data_provider.clone()), rev_manager);
let ws_data_stream = Arc::new(FolderRevisionWSDataStream::new(conflict_controller));
let ws_data_sink = Arc::new(FolderWSDataSink(ws_data_provider));
let ping_duration = Duration::from_millis(FOLDER_SYNC_INTERVAL_IN_MILLIS);
@ -51,52 +76,57 @@ struct FolderConflictResolver {
folder_pad: Arc<RwLock<FolderPad>>,
}
impl ConflictResolver<EmptyAttributes> for FolderConflictResolver {
fn compose_delta(&self, delta: Delta) -> BoxResultFuture<OperationsMD5, FlowyError> {
impl ConflictResolver<FolderResolveOperations> for FolderConflictResolver {
fn compose_operations(&self, operations: FolderResolveOperations) -> BoxResultFuture<OperationsMD5, FlowyError> {
let operations = operations.into_inner();
let folder_pad = self.folder_pad.clone();
Box::pin(async move {
let md5 = folder_pad.write().compose_remote_operations(delta)?;
let md5 = folder_pad.write().compose_remote_operations(operations)?;
Ok(md5)
})
}
fn transform_delta(&self, delta: Delta) -> BoxResultFuture<TransformDeltas<EmptyAttributes>, FlowyError> {
fn transform_operations(
&self,
operations: FolderResolveOperations,
) -> BoxResultFuture<TransformOperations<FolderResolveOperations>, FlowyError> {
let folder_pad = self.folder_pad.clone();
let operations = operations.into_inner();
Box::pin(async move {
let read_guard = folder_pad.read();
let mut server_prime: Option<Delta> = None;
let client_prime: Delta;
let mut server_operations: Option<FolderResolveOperations> = None;
let client_operations: FolderResolveOperations;
if read_guard.is_empty() {
// Do nothing
client_prime = delta;
client_operations = FolderResolveOperations(operations);
} else {
let (s_prime, c_prime) = read_guard.get_operations().transform(&delta)?;
client_prime = c_prime;
server_prime = Some(s_prime);
let (s_prime, c_prime) = read_guard.get_operations().transform(&operations)?;
client_operations = FolderResolveOperations(c_prime);
server_operations = Some(FolderResolveOperations(s_prime));
}
drop(read_guard);
Ok(TransformDeltas {
client_prime,
server_prime,
Ok(TransformOperations {
client_operations,
server_operations,
})
})
}
fn reset_delta(&self, delta: Delta) -> BoxResultFuture<OperationsMD5, FlowyError> {
fn reset_operations(&self, operations: FolderResolveOperations) -> BoxResultFuture<OperationsMD5, FlowyError> {
let folder_pad = self.folder_pad.clone();
Box::pin(async move {
let md5 = folder_pad.write().reset_folder(delta)?;
let md5 = folder_pad.write().reset_folder(operations.into_inner())?;
Ok(md5)
})
}
}
struct FolderRevisionWSDataStream {
conflict_controller: Arc<PlainTextConflictController>,
conflict_controller: Arc<FolderConflictController>,
}
impl FolderRevisionWSDataStream {
pub fn new(conflict_controller: PlainTextConflictController) -> Self {
pub fn new(conflict_controller: FolderConflictController) -> Self {
Self {
conflict_controller: Arc::new(conflict_controller),
}

View File

@ -18,7 +18,7 @@ use flowy_folder::{errors::ErrorCode, services::folder_editor::FolderEditor};
use flowy_revision::disk::RevisionState;
use flowy_revision::REVISION_WRITE_INTERVAL_IN_MILLIS;
use flowy_sync::entities::text_block::DocumentPB;
use flowy_sync::entities::document::DocumentPayloadPB;
use flowy_test::{event_builder::*, FlowySDKTest};
use std::{sync::Arc, time::Duration};
use tokio::time::sleep;
@ -412,14 +412,14 @@ pub async fn delete_view(sdk: &FlowySDKTest, view_ids: Vec<String>) {
}
#[allow(dead_code)]
pub async fn set_latest_view(sdk: &FlowySDKTest, view_id: &str) -> DocumentPB {
pub async fn set_latest_view(sdk: &FlowySDKTest, view_id: &str) -> DocumentPayloadPB {
let view_id: ViewIdPB = view_id.into();
FolderEventBuilder::new(sdk.clone())
.event(SetLatestView)
.payload(view_id)
.async_send()
.await
.parse::<DocumentPB>()
.parse::<DocumentPayloadPB>()
}
pub async fn read_trash(sdk: &FlowySDKTest) -> RepeatedTrashPB {

View File

@ -2,7 +2,9 @@ use crate::entities::RowPB;
use bytes::Bytes;
use flowy_error::{FlowyError, FlowyResult};
use flowy_grid_data_model::revision::{CellRevision, GridBlockRevision, RowChangeset, RowRevision};
use flowy_revision::{RevisionCloudService, RevisionCompactor, RevisionManager, RevisionObjectBuilder};
use flowy_revision::{
RevisionCloudService, RevisionCompress, RevisionManager, RevisionObjectDeserializer, RevisionObjectSerializer,
};
use flowy_sync::client_grid::{GridBlockRevisionChangeset, GridBlockRevisionPad};
use flowy_sync::entities::revision::Revision;
use flowy_sync::util::make_operations_from_revisions;
@ -30,7 +32,7 @@ impl GridBlockRevisionEditor {
let cloud = Arc::new(GridBlockRevisionCloudService {
token: token.to_owned(),
});
let block_revision_pad = rev_manager.load::<GridBlockRevisionPadBuilder>(Some(cloud)).await?;
let block_revision_pad = rev_manager.load::<GridBlockRevisionSerde>(Some(cloud)).await?;
let pad = Arc::new(RwLock::new(block_revision_pad));
let rev_manager = Arc::new(rev_manager);
let user_id = user_id.to_owned();
@ -192,20 +194,25 @@ impl RevisionCloudService for GridBlockRevisionCloudService {
}
}
struct GridBlockRevisionPadBuilder();
impl RevisionObjectBuilder for GridBlockRevisionPadBuilder {
struct GridBlockRevisionSerde();
impl RevisionObjectDeserializer for GridBlockRevisionSerde {
type Output = GridBlockRevisionPad;
fn build_object(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
fn deserialize_revisions(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
let pad = GridBlockRevisionPad::from_revisions(object_id, revisions)?;
Ok(pad)
}
}
pub struct GridBlockRevisionCompactor();
impl RevisionCompactor for GridBlockRevisionCompactor {
fn bytes_from_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
impl RevisionObjectSerializer for GridBlockRevisionSerde {
fn serialize_revisions(revisions: Vec<Revision>) -> FlowyResult<Bytes> {
let operations = make_operations_from_revisions::<EmptyAttributes>(revisions)?;
Ok(operations.json_bytes())
}
}
pub struct GridBlockRevisionCompactor();
impl RevisionCompress for GridBlockRevisionCompactor {
fn serialize_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
GridBlockRevisionSerde::serialize_revisions(revisions)
}
}

View File

@ -122,7 +122,7 @@ pub trait SelectTypeOptionSharedAction: TypeOptionDataSerializer + Send + Sync {
fn transform_type_option(&mut self, field_type: &FieldType, _type_option_data: String) {
match field_type {
FieldType::Checkbox => {
//add Yes and No options if it's not exist.
//add Yes and No options if it does not exist.
if !self.options().iter().any(|option| option.name == CHECK) {
let check_option = SelectOptionPB::with_color(CHECK, SelectOptionColorPB::Green);
self.mut_options().push(check_option);

View File

@ -16,7 +16,9 @@ use crate::services::row::{make_grid_blocks, make_rows_from_row_revs, GridBlockS
use bytes::Bytes;
use flowy_error::{ErrorCode, FlowyError, FlowyResult};
use flowy_grid_data_model::revision::*;
use flowy_revision::{RevisionCloudService, RevisionCompactor, RevisionManager, RevisionObjectBuilder};
use flowy_revision::{
RevisionCloudService, RevisionCompress, RevisionManager, RevisionObjectDeserializer, RevisionObjectSerializer,
};
use flowy_sync::client_grid::{GridRevisionChangeset, GridRevisionPad, JsonDeserializer};
use flowy_sync::entities::revision::Revision;
use flowy_sync::errors::{CollaborateError, CollaborateResult};
@ -56,7 +58,7 @@ impl GridRevisionEditor {
) -> FlowyResult<Arc<Self>> {
let token = user.token()?;
let cloud = Arc::new(GridRevisionCloudService { token });
let grid_pad = rev_manager.load::<GridPadBuilder>(Some(cloud)).await?;
let grid_pad = rev_manager.load::<GridRevisionSerde>(Some(cloud)).await?;
let rev_manager = Arc::new(rev_manager);
let grid_pad = Arc::new(RwLock::new(grid_pad));
@ -830,16 +832,21 @@ impl GridRevisionEditor {
}
}
pub struct GridPadBuilder();
impl RevisionObjectBuilder for GridPadBuilder {
pub struct GridRevisionSerde();
impl RevisionObjectDeserializer for GridRevisionSerde {
type Output = GridRevisionPad;
fn build_object(_object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
fn deserialize_revisions(_object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
let pad = GridRevisionPad::from_revisions(revisions)?;
Ok(pad)
}
}
impl RevisionObjectSerializer for GridRevisionSerde {
fn serialize_revisions(revisions: Vec<Revision>) -> FlowyResult<Bytes> {
let operations = make_operations_from_revisions::<EmptyAttributes>(revisions)?;
Ok(operations.json_bytes())
}
}
struct GridRevisionCloudService {
#[allow(dead_code)]
token: String,
@ -853,10 +860,10 @@ impl RevisionCloudService for GridRevisionCloudService {
}
pub struct GridRevisionCompactor();
impl RevisionCompactor for GridRevisionCompactor {
fn bytes_from_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
let operations = make_operations_from_revisions::<EmptyAttributes>(revisions)?;
Ok(operations.json_bytes())
impl RevisionCompress for GridRevisionCompactor {
fn serialize_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
GridRevisionSerde::serialize_revisions(revisions)
}
}

View File

@ -11,15 +11,20 @@ use crate::services::group::{
default_group_configuration, find_group_field, make_group_controller, GroupConfigurationReader,
GroupConfigurationWriter, GroupController, MoveGroupRowContext,
};
use bytes::Bytes;
use flowy_error::{FlowyError, FlowyResult};
use flowy_grid_data_model::revision::{
gen_grid_filter_id, FieldRevision, FieldTypeRevision, FilterConfigurationRevision, GroupConfigurationRevision,
RowChangeset, RowRevision,
};
use flowy_revision::{RevisionCloudService, RevisionManager, RevisionObjectBuilder};
use flowy_revision::{
RevisionCloudService, RevisionCompress, RevisionManager, RevisionObjectDeserializer, RevisionObjectSerializer,
};
use flowy_sync::client_grid::{GridViewRevisionChangeset, GridViewRevisionPad};
use flowy_sync::entities::revision::Revision;
use flowy_sync::util::make_operations_from_revisions;
use lib_infra::future::{wrap_future, AFFuture, FutureResult};
use lib_ot::core::EmptyAttributes;
use std::future::Future;
use std::sync::Arc;
use tokio::sync::RwLock;
@ -49,7 +54,7 @@ impl GridViewRevisionEditor {
let cloud = Arc::new(GridViewRevisionCloudService {
token: token.to_owned(),
});
let view_revision_pad = rev_manager.load::<GridViewRevisionPadBuilder>(Some(cloud)).await?;
let view_revision_pad = rev_manager.load::<GridViewRevisionSerde>(Some(cloud)).await?;
let pad = Arc::new(RwLock::new(view_revision_pad));
let rev_manager = Arc::new(rev_manager);
let group_controller = new_group_controller(
@ -472,16 +477,30 @@ impl RevisionCloudService for GridViewRevisionCloudService {
}
}
struct GridViewRevisionPadBuilder();
impl RevisionObjectBuilder for GridViewRevisionPadBuilder {
pub struct GridViewRevisionSerde();
impl RevisionObjectDeserializer for GridViewRevisionSerde {
type Output = GridViewRevisionPad;
fn build_object(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
fn deserialize_revisions(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
let pad = GridViewRevisionPad::from_revisions(object_id, revisions)?;
Ok(pad)
}
}
impl RevisionObjectSerializer for GridViewRevisionSerde {
fn serialize_revisions(revisions: Vec<Revision>) -> FlowyResult<Bytes> {
let operations = make_operations_from_revisions::<EmptyAttributes>(revisions)?;
Ok(operations.json_bytes())
}
}
pub struct GridViewRevisionCompactor();
impl RevisionCompress for GridViewRevisionCompactor {
fn serialize_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
GridViewRevisionSerde::serialize_revisions(revisions)
}
}
struct GroupConfigurationReaderImpl(Arc<RwLock<GridViewRevisionPad>>);
impl GroupConfigurationReader for GroupConfigurationReaderImpl {

View File

@ -4,17 +4,14 @@ use crate::entities::{
};
use crate::manager::GridUser;
use crate::services::grid_editor_task::GridServiceTaskScheduler;
use crate::services::grid_view_editor::GridViewRevisionEditor;
use bytes::Bytes;
use crate::services::grid_view_editor::{GridViewRevisionCompactor, GridViewRevisionEditor};
use dashmap::DashMap;
use flowy_error::FlowyResult;
use flowy_grid_data_model::revision::{FieldRevision, RowChangeset, RowRevision};
use flowy_revision::disk::SQLiteGridViewRevisionPersistence;
use flowy_revision::{RevisionCompactor, RevisionManager, RevisionPersistence, SQLiteRevisionSnapshotPersistence};
use flowy_sync::entities::revision::Revision;
use flowy_sync::util::make_operations_from_revisions;
use flowy_revision::{RevisionManager, RevisionPersistence, SQLiteRevisionSnapshotPersistence};
use lib_infra::future::AFFuture;
use lib_ot::core::EmptyAttributes;
use std::sync::Arc;
type ViewId = String;
@ -264,11 +261,3 @@ pub async fn make_grid_view_rev_manager(user: &Arc<dyn GridUser>, view_id: &str)
snapshot_persistence,
))
}
pub struct GridViewRevisionCompactor();
impl RevisionCompactor for GridViewRevisionCompactor {
fn bytes_from_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
let operations = make_operations_from_revisions::<EmptyAttributes>(revisions)?;
Ok(operations.json_bytes())
}
}

View File

@ -1,5 +1,6 @@
use crate::manager::GridUser;
use crate::services::persistence::GridDatabase;
use bytes::Bytes;
use flowy_database::kv::KV;
use flowy_error::FlowyResult;
use flowy_grid_data_model::revision::GridRevision;
@ -8,6 +9,7 @@ use flowy_revision::reset::{RevisionResettable, RevisionStructReset};
use flowy_sync::client_grid::{make_grid_rev_json_str, GridRevisionPad};
use flowy_sync::entities::revision::Revision;
use flowy_sync::util::md5;
use lib_ot::core::DeltaBuilder;
use std::sync::Arc;
const V1_MIGRATION: &str = "GRID_V1_MIGRATION";
@ -59,10 +61,11 @@ impl RevisionResettable for GridRevisionResettable {
&self.grid_id
}
fn target_reset_rev_str(&self, revisions: Vec<Revision>) -> FlowyResult<String> {
fn reset_data(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
let pad = GridRevisionPad::from_revisions(revisions)?;
let json = pad.json_str()?;
Ok(json)
let bytes = DeltaBuilder::new().insert(&json).build().json_bytes();
Ok(bytes)
}
fn default_target_rev_str(&self) -> FlowyResult<String> {

View File

@ -6,7 +6,7 @@ use bytes::Bytes;
use flowy_grid::entities::*;
use flowy_grid::services::field::SelectOptionPB;
use flowy_grid::services::field::*;
use flowy_grid::services::grid_editor::{GridPadBuilder, GridRevisionEditor};
use flowy_grid::services::grid_editor::{GridRevisionEditor, GridRevisionSerde};
use flowy_grid::services::row::{CreateRowRevisionPayload, RowRevisionBuilder};
use flowy_grid::services::setting::GridSettingChangesetBuilder;
use flowy_grid_data_model::revision::*;

View File

@ -13,7 +13,7 @@ flowy-sync = { path = "../../../shared-lib/flowy-sync"}
flowy-folder-data-model = { path = "../../../shared-lib/flowy-folder-data-model"}
flowy-folder = { path = "../flowy-folder" }
flowy-user = { path = "../flowy-user" }
flowy-text-block = { path = "../flowy-text-block" }
flowy-document = { path = "../flowy-document" }
lazy_static = "1.4.0"
lib-infra = { path = "../../../shared-lib/lib-infra" }
protobuf = {version = "2.18.0"}

View File

@ -2,45 +2,45 @@ use crate::{
configuration::*,
request::{HttpRequestBuilder, ResponseMiddleware},
};
use flowy_document::DocumentCloudService;
use flowy_error::FlowyError;
use flowy_sync::entities::text_block::{CreateTextBlockParams, DocumentPB, ResetTextBlockParams, TextBlockIdPB};
use flowy_text_block::TextEditorCloudService;
use flowy_sync::entities::document::{CreateDocumentParams, DocumentIdPB, DocumentPayloadPB, ResetDocumentParams};
use http_flowy::response::FlowyResponse;
use lazy_static::lazy_static;
use lib_infra::future::FutureResult;
use std::sync::Arc;
pub struct BlockHttpCloudService {
pub struct DocumentCloudServiceImpl {
config: ClientServerConfiguration,
}
impl BlockHttpCloudService {
impl DocumentCloudServiceImpl {
pub fn new(config: ClientServerConfiguration) -> Self {
Self { config }
}
}
impl TextEditorCloudService for BlockHttpCloudService {
fn create_text_block(&self, token: &str, params: CreateTextBlockParams) -> FutureResult<(), FlowyError> {
impl DocumentCloudService for DocumentCloudServiceImpl {
fn create_document(&self, token: &str, params: CreateDocumentParams) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.doc_url();
FutureResult::new(async move { create_document_request(&token, params, &url).await })
}
fn read_text_block(&self, token: &str, params: TextBlockIdPB) -> FutureResult<Option<DocumentPB>, FlowyError> {
fn fetch_document(&self, token: &str, params: DocumentIdPB) -> FutureResult<Option<DocumentPayloadPB>, FlowyError> {
let token = token.to_owned();
let url = self.config.doc_url();
FutureResult::new(async move { read_document_request(&token, params, &url).await })
}
fn update_text_block(&self, token: &str, params: ResetTextBlockParams) -> FutureResult<(), FlowyError> {
fn update_document_content(&self, token: &str, params: ResetDocumentParams) -> FutureResult<(), FlowyError> {
let token = token.to_owned();
let url = self.config.doc_url();
FutureResult::new(async move { reset_doc_request(&token, params, &url).await })
}
}
pub async fn create_document_request(token: &str, params: CreateTextBlockParams, url: &str) -> Result<(), FlowyError> {
pub async fn create_document_request(token: &str, params: CreateDocumentParams, url: &str) -> Result<(), FlowyError> {
let _ = request_builder()
.post(url)
.header(HEADER_TOKEN, token)
@ -52,9 +52,9 @@ pub async fn create_document_request(token: &str, params: CreateTextBlockParams,
pub async fn read_document_request(
token: &str,
params: TextBlockIdPB,
params: DocumentIdPB,
url: &str,
) -> Result<Option<DocumentPB>, FlowyError> {
) -> Result<Option<DocumentPayloadPB>, FlowyError> {
let doc = request_builder()
.get(url)
.header(HEADER_TOKEN, token)
@ -65,7 +65,7 @@ pub async fn read_document_request(
Ok(doc)
}
pub async fn reset_doc_request(token: &str, params: ResetTextBlockParams, url: &str) -> Result<(), FlowyError> {
pub async fn reset_doc_request(token: &str, params: ResetDocumentParams, url: &str) -> Result<(), FlowyError> {
let _ = request_builder()
.patch(url)
.header(HEADER_TOKEN, token)

View File

@ -1,6 +1,6 @@
use flowy_sync::entities::revision::{RepeatedRevision, Revision};
use flowy_sync::{
entities::{folder::FolderInfo, text_block::DocumentPB},
entities::{document::DocumentPayloadPB, folder::FolderInfo},
errors::CollaborateError,
server_document::*,
server_folder::FolderCloudPersistence,
@ -29,25 +29,25 @@ pub trait RevisionCloudStorage: Send + Sync {
) -> BoxResultFuture<(), CollaborateError>;
}
pub(crate) struct LocalTextBlockCloudPersistence {
pub(crate) struct LocalDocumentCloudPersistence {
storage: Arc<dyn RevisionCloudStorage>,
}
impl Debug for LocalTextBlockCloudPersistence {
impl Debug for LocalDocumentCloudPersistence {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str("LocalRevisionCloudPersistence")
}
}
impl std::default::Default for LocalTextBlockCloudPersistence {
impl std::default::Default for LocalDocumentCloudPersistence {
fn default() -> Self {
LocalTextBlockCloudPersistence {
LocalDocumentCloudPersistence {
storage: Arc::new(MemoryDocumentCloudStorage::default()),
}
}
}
impl FolderCloudPersistence for LocalTextBlockCloudPersistence {
impl FolderCloudPersistence for LocalDocumentCloudPersistence {
fn read_folder(&self, _user_id: &str, folder_id: &str) -> BoxResultFuture<FolderInfo, CollaborateError> {
let storage = self.storage.clone();
let folder_id = folder_id.to_owned();
@ -109,8 +109,8 @@ impl FolderCloudPersistence for LocalTextBlockCloudPersistence {
}
}
impl TextBlockCloudPersistence for LocalTextBlockCloudPersistence {
fn read_text_block(&self, doc_id: &str) -> BoxResultFuture<DocumentPB, CollaborateError> {
impl DocumentCloudPersistence for LocalDocumentCloudPersistence {
fn read_document(&self, doc_id: &str) -> BoxResultFuture<DocumentPayloadPB, CollaborateError> {
let storage = self.storage.clone();
let doc_id = doc_id.to_owned();
Box::pin(async move {
@ -122,11 +122,11 @@ impl TextBlockCloudPersistence for LocalTextBlockCloudPersistence {
})
}
fn create_text_block(
fn create_document(
&self,
doc_id: &str,
repeated_revision: RepeatedRevision,
) -> BoxResultFuture<Option<DocumentPB>, CollaborateError> {
) -> BoxResultFuture<Option<DocumentPayloadPB>, CollaborateError> {
let doc_id = doc_id.to_owned();
let storage = self.storage.clone();
Box::pin(async move {
@ -135,7 +135,7 @@ impl TextBlockCloudPersistence for LocalTextBlockCloudPersistence {
})
}
fn read_text_block_revisions(
fn read_document_revisions(
&self,
doc_id: &str,
rev_ids: Option<Vec<i64>>,
@ -148,7 +148,7 @@ impl TextBlockCloudPersistence for LocalTextBlockCloudPersistence {
})
}
fn save_text_block_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError> {
fn save_document_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError> {
let storage = self.storage.clone();
Box::pin(async move {
let _ = storage.set_revisions(repeated_revision).await?;
@ -156,7 +156,7 @@ impl TextBlockCloudPersistence for LocalTextBlockCloudPersistence {
})
}
fn reset_text_block(&self, doc_id: &str, revisions: RepeatedRevision) -> BoxResultFuture<(), CollaborateError> {
fn reset_document(&self, doc_id: &str, revisions: RepeatedRevision) -> BoxResultFuture<(), CollaborateError> {
let storage = self.storage.clone();
let doc_id = doc_id.to_owned();
Box::pin(async move {

View File

@ -1,4 +1,4 @@
use crate::local_server::persistence::LocalTextBlockCloudPersistence;
use crate::local_server::persistence::LocalDocumentCloudPersistence;
use async_stream::stream;
use bytes::Bytes;
use flowy_error::{internal_error, FlowyError};
@ -6,7 +6,7 @@ use flowy_folder::event_map::FolderCouldServiceV1;
use flowy_sync::{
client_document::default::initial_document_str,
entities::{
text_block::{CreateTextBlockParams, DocumentPB, ResetTextBlockParams, TextBlockIdPB},
document::{CreateDocumentParams, DocumentIdPB, DocumentPayloadPB, ResetDocumentParams},
ws_data::{ClientRevisionWSData, ClientRevisionWSDataType},
},
errors::CollaborateError,
@ -39,7 +39,7 @@ impl LocalServer {
client_ws_sender: mpsc::UnboundedSender<WebSocketRawMessage>,
client_ws_receiver: broadcast::Sender<WebSocketRawMessage>,
) -> Self {
let persistence = Arc::new(LocalTextBlockCloudPersistence::default());
let persistence = Arc::new(LocalDocumentCloudPersistence::default());
let doc_manager = Arc::new(ServerDocumentManager::new(persistence.clone()));
let folder_manager = Arc::new(ServerFolderManager::new(persistence));
let stop_tx = RwLock::new(None);
@ -252,6 +252,7 @@ impl RevisionUser for LocalRevisionUser {
}
}
use flowy_document::DocumentCloudService;
use flowy_folder::entities::{
app::{AppIdPB, CreateAppParams, UpdateAppParams},
trash::RepeatedTrashIdPB,
@ -261,7 +262,6 @@ use flowy_folder::entities::{
use flowy_folder_data_model::revision::{
gen_app_id, gen_workspace_id, AppRevision, TrashRevision, ViewRevision, WorkspaceRevision,
};
use flowy_text_block::TextEditorCloudService;
use flowy_user::entities::{
SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserProfileParams, UserProfilePB,
};
@ -414,22 +414,26 @@ impl UserCloudService for LocalServer {
}
}
impl TextEditorCloudService for LocalServer {
fn create_text_block(&self, _token: &str, _params: CreateTextBlockParams) -> FutureResult<(), FlowyError> {
impl DocumentCloudService for LocalServer {
fn create_document(&self, _token: &str, _params: CreateDocumentParams) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
fn read_text_block(&self, _token: &str, params: TextBlockIdPB) -> FutureResult<Option<DocumentPB>, FlowyError> {
let doc = DocumentPB {
block_id: params.value,
text: initial_document_str(),
fn fetch_document(
&self,
_token: &str,
params: DocumentIdPB,
) -> FutureResult<Option<DocumentPayloadPB>, FlowyError> {
let doc = DocumentPayloadPB {
doc_id: params.value,
content: initial_document_str(),
rev_id: 0,
base_rev_id: 0,
};
FutureResult::new(async { Ok(Some(doc)) })
}
fn update_text_block(&self, _token: &str, _params: ResetTextBlockParams) -> FutureResult<(), FlowyError> {
fn update_document_content(&self, _token: &str, _params: ResetDocumentParams) -> FutureResult<(), FlowyError> {
FutureResult::new(async { Ok(()) })
}
}

View File

@ -7,7 +7,6 @@ edition = "2018"
[dependencies]
flowy-sync = { path = "../../../shared-lib/flowy-sync" }
lib-ot = { path = "../../../shared-lib/lib-ot" }
lib-ws = { path = "../../../shared-lib/lib-ws" }
lib-infra = { path = "../../../shared-lib/lib-infra" }
flowy-database = { path = "../flowy-database" }
@ -26,4 +25,4 @@ async-stream = "0.3.2"
serde_json = {version = "1.0"}
[features]
flowy_unit_test = ["lib-ot/flowy_unit_test"]
flowy_unit_test = []

View File

@ -15,12 +15,12 @@ use flowy_sync::{
};
use std::sync::Arc;
pub struct SQLiteTextBlockRevisionPersistence {
pub struct SQLiteDocumentRevisionPersistence {
user_id: String,
pub(crate) pool: Arc<ConnectionPool>,
}
impl RevisionDiskCache for SQLiteTextBlockRevisionPersistence {
impl RevisionDiskCache for SQLiteDocumentRevisionPersistence {
type Error = FlowyError;
fn create_revision_records(&self, revision_records: Vec<RevisionRecord>) -> Result<(), Self::Error> {
@ -81,7 +81,7 @@ impl RevisionDiskCache for SQLiteTextBlockRevisionPersistence {
}
}
impl SQLiteTextBlockRevisionPersistence {
impl SQLiteDocumentRevisionPersistence {
pub fn new(user_id: &str, pool: Arc<ConnectionPool>) -> Self {
Self {
user_id: user_id.to_owned(),

View File

@ -1,17 +1,18 @@
use crate::disk::{RevisionDiskCache, RevisionRecord};
use crate::{RevisionLoader, RevisionPersistence};
use bytes::Bytes;
use flowy_database::kv::KV;
use flowy_error::{FlowyError, FlowyResult};
use flowy_sync::entities::revision::Revision;
use lib_ot::core::DeltaBuilder;
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use std::sync::Arc;
pub trait RevisionResettable {
fn target_id(&self) -> &str;
// String in json format
fn target_reset_rev_str(&self, revisions: Vec<Revision>) -> FlowyResult<String>;
fn reset_data(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes>;
// String in json format
fn default_target_rev_str(&self) -> FlowyResult<String>;
@ -69,9 +70,8 @@ where
.load()
.await?;
let s = self.target.target_reset_rev_str(revisions)?;
let delta_data = DeltaBuilder::new().insert(&s).build().json_bytes();
let revision = Revision::initial_revision(&self.user_id, self.target.target_id(), delta_data);
let bytes = self.target.reset_data(revisions)?;
let revision = Revision::initial_revision(&self.user_id, self.target.target_id(), bytes);
let record = RevisionRecord::new(revision);
tracing::trace!("Reset {} revision record object", self.target.target_id());

View File

@ -1,28 +1,39 @@
use crate::RevisionManager;
use bytes::Bytes;
use flowy_error::{FlowyError, FlowyResult};
use flowy_sync::{
entities::{
revision::{RepeatedRevision, Revision, RevisionRange},
ws_data::ServerRevisionWSDataType,
},
util::make_operations_from_revisions,
use flowy_sync::entities::{
revision::{RepeatedRevision, Revision, RevisionRange},
ws_data::ServerRevisionWSDataType,
};
use lib_infra::future::BoxResultFuture;
use lib_ot::core::{AttributeHashMap, DeltaOperations, EmptyAttributes, OperationAttributes};
use serde::de::DeserializeOwned;
use std::{convert::TryFrom, sync::Arc};
pub type OperationsMD5 = String;
pub trait ConflictResolver<T>
pub struct TransformOperations<Operations> {
pub client_operations: Operations,
pub server_operations: Option<Operations>,
}
pub trait OperationsDeserializer<T>: Send + Sync {
fn deserialize_revisions(revisions: Vec<Revision>) -> FlowyResult<T>;
}
pub trait OperationsSerializer: Send + Sync {
fn serialize_operations(&self) -> Bytes;
}
pub struct ConflictOperations<T>(T);
pub trait ConflictResolver<Operations>
where
T: OperationAttributes + Send + Sync,
Operations: Send + Sync,
{
fn compose_delta(&self, delta: DeltaOperations<T>) -> BoxResultFuture<OperationsMD5, FlowyError>;
fn transform_delta(&self, delta: DeltaOperations<T>) -> BoxResultFuture<TransformDeltas<T>, FlowyError>;
fn reset_delta(&self, delta: DeltaOperations<T>) -> BoxResultFuture<OperationsMD5, FlowyError>;
fn compose_operations(&self, operations: Operations) -> BoxResultFuture<OperationsMD5, FlowyError>;
fn transform_operations(
&self,
operations: Operations,
) -> BoxResultFuture<TransformOperations<Operations>, FlowyError>;
fn reset_operations(&self, operations: Operations) -> BoxResultFuture<OperationsMD5, FlowyError>;
}
pub trait ConflictRevisionSink: Send + Sync + 'static {
@ -30,26 +41,23 @@ pub trait ConflictRevisionSink: Send + Sync + 'static {
fn ack(&self, rev_id: String, ty: ServerRevisionWSDataType) -> BoxResultFuture<(), FlowyError>;
}
pub type RichTextConflictController = ConflictController<AttributeHashMap>;
pub type PlainTextConflictController = ConflictController<EmptyAttributes>;
pub struct ConflictController<T>
pub struct ConflictController<Operations>
where
T: OperationAttributes + Send + Sync,
Operations: Send + Sync,
{
user_id: String,
resolver: Arc<dyn ConflictResolver<T> + Send + Sync>,
resolver: Arc<dyn ConflictResolver<Operations> + Send + Sync>,
rev_sink: Arc<dyn ConflictRevisionSink>,
rev_manager: Arc<RevisionManager>,
}
impl<T> ConflictController<T>
impl<Operations> ConflictController<Operations>
where
T: OperationAttributes + Send + Sync + DeserializeOwned + serde::Serialize,
Operations: Clone + Send + Sync,
{
pub fn new(
user_id: &str,
resolver: Arc<dyn ConflictResolver<T> + Send + Sync>,
resolver: Arc<dyn ConflictResolver<Operations> + Send + Sync>,
rev_sink: Arc<dyn ConflictRevisionSink>,
rev_manager: Arc<RevisionManager>,
) -> Self {
@ -61,7 +69,12 @@ where
rev_manager,
}
}
}
impl<Operations> ConflictController<Operations>
where
Operations: OperationsSerializer + OperationsDeserializer<Operations> + Clone + Send + Sync,
{
pub async fn receive_bytes(&self, bytes: Bytes) -> FlowyResult<()> {
let repeated_revision = RepeatedRevision::try_from(bytes)?;
if repeated_revision.is_empty() {
@ -103,33 +116,32 @@ where
}
}
let new_delta = make_operations_from_revisions(revisions.clone())?;
let new_operations = Operations::deserialize_revisions(revisions.clone())?;
let TransformOperations {
client_operations,
server_operations,
} = self.resolver.transform_operations(new_operations).await?;
let TransformDeltas {
client_prime,
server_prime,
} = self.resolver.transform_delta(new_delta).await?;
match server_prime {
match server_operations {
None => {
// The server_prime is None means the client local revisions conflict with the
// // server, and it needs to override the client delta.
let md5 = self.resolver.reset_delta(client_prime).await?;
let md5 = self.resolver.reset_operations(client_operations).await?;
let repeated_revision = RepeatedRevision::new(revisions);
assert_eq!(repeated_revision.last().unwrap().md5, md5);
let _ = self.rev_manager.reset_object(repeated_revision).await?;
Ok(None)
}
Some(server_prime) => {
let md5 = self.resolver.compose_delta(client_prime.clone()).await?;
Some(server_operations) => {
let md5 = self.resolver.compose_operations(client_operations.clone()).await?;
for revision in &revisions {
let _ = self.rev_manager.add_remote_revision(revision).await?;
}
let (client_revision, server_revision) = make_client_and_server_revision(
&self.user_id,
&self.rev_manager,
client_prime,
Some(server_prime),
client_operations,
Some(server_operations),
md5,
);
let _ = self.rev_manager.add_remote_revision(&client_revision).await?;
@ -139,48 +151,26 @@ where
}
}
fn make_client_and_server_revision<T>(
fn make_client_and_server_revision<Operations>(
user_id: &str,
rev_manager: &Arc<RevisionManager>,
client_delta: DeltaOperations<T>,
server_delta: Option<DeltaOperations<T>>,
client_operations: Operations,
server_operations: Option<Operations>,
md5: String,
) -> (Revision, Option<Revision>)
where
T: OperationAttributes + serde::Serialize,
Operations: OperationsSerializer,
{
let (base_rev_id, rev_id) = rev_manager.next_rev_id_pair();
let client_revision = Revision::new(
&rev_manager.object_id,
base_rev_id,
rev_id,
client_delta.json_bytes(),
user_id,
md5.clone(),
);
let bytes = client_operations.serialize_operations();
let client_revision = Revision::new(&rev_manager.object_id, base_rev_id, rev_id, bytes, user_id, md5.clone());
match server_delta {
match server_operations {
None => (client_revision, None),
Some(server_delta) => {
let server_revision = Revision::new(
&rev_manager.object_id,
base_rev_id,
rev_id,
server_delta.json_bytes(),
user_id,
md5,
);
Some(operations) => {
let bytes = operations.serialize_operations();
let server_revision = Revision::new(&rev_manager.object_id, base_rev_id, rev_id, bytes, user_id, md5);
(client_revision, Some(server_revision))
}
}
}
pub type RichTextTransformDeltas = TransformDeltas<AttributeHashMap>;
pub struct TransformDeltas<T>
where
T: OperationAttributes,
{
pub client_prime: DeltaOperations<T>,
pub server_prime: Option<DeltaOperations<T>>,
}

View File

@ -9,21 +9,48 @@ use flowy_sync::{
use lib_infra::future::FutureResult;
use std::sync::Arc;
pub type SyncObject = lib_ot::text_delta::TextOperations;
pub trait RevisionCloudService: Send + Sync {
/// Read the object's revision from remote
/// Returns a list of revisions that used to build the object
/// # Arguments
///
/// * `user_id`: the id of the user
/// * `object_id`: the id of the object
///
fn fetch_object(&self, user_id: &str, object_id: &str) -> FutureResult<Vec<Revision>, FlowyError>;
}
pub trait RevisionObjectBuilder: Send + Sync {
pub trait RevisionObjectDeserializer: Send + Sync {
type Output;
fn build_object(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output>;
/// Deserialize the list of revisions into an concrete object type.
///
/// # Arguments
///
/// * `object_id`: the id of the object
/// * `revisions`: a list of revisions that represent the object
///
fn deserialize_revisions(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output>;
}
pub trait RevisionCompactor: Send + Sync {
fn compact(&self, user_id: &str, object_id: &str, mut revisions: Vec<Revision>) -> FlowyResult<Revision> {
pub trait RevisionObjectSerializer: Send + Sync {
/// Serialize the list of revisions to `Bytes`
///
/// * `revisions`: a list of revisions will be serialized to `Bytes`
///
fn serialize_revisions(revisions: Vec<Revision>) -> FlowyResult<Bytes>;
}
/// `RevisionCompress` is used to compress multiple revisions into one revision
///
pub trait RevisionCompress: Send + Sync {
fn compress_revisions(
&self,
user_id: &str,
object_id: &str,
mut revisions: Vec<Revision>,
) -> FlowyResult<Revision> {
if revisions.is_empty() {
return Err(FlowyError::internal().context("Can't compact the empty folder's revisions"));
return Err(FlowyError::internal().context("Can't compact the empty revisions"));
}
if revisions.len() == 1 {
@ -35,11 +62,11 @@ pub trait RevisionCompactor: Send + Sync {
let (base_rev_id, rev_id) = first_revision.pair_rev_id();
let md5 = last_revision.md5.clone();
let bytes = self.bytes_from_revisions(revisions)?;
let bytes = self.serialize_revisions(revisions)?;
Ok(Revision::new(object_id, base_rev_id, rev_id, bytes, user_id, md5))
}
fn bytes_from_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes>;
fn serialize_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes>;
}
pub struct RevisionManager {
@ -49,7 +76,7 @@ pub struct RevisionManager {
rev_persistence: Arc<RevisionPersistence>,
#[allow(dead_code)]
rev_snapshot: Arc<RevisionSnapshotManager>,
rev_compactor: Arc<dyn RevisionCompactor>,
rev_compress: Arc<dyn RevisionCompress>,
#[cfg(feature = "flowy_unit_test")]
rev_ack_notifier: tokio::sync::broadcast::Sender<i64>,
}
@ -64,13 +91,11 @@ impl RevisionManager {
) -> Self
where
SP: 'static + RevisionSnapshotDiskCache,
C: 'static + RevisionCompactor,
C: 'static + RevisionCompress,
{
let rev_id_counter = RevIdCounter::new(0);
let rev_compactor = Arc::new(rev_compactor);
let rev_persistence = Arc::new(rev_persistence);
let rev_snapshot = Arc::new(RevisionSnapshotManager::new(user_id, object_id, snapshot_persistence));
#[cfg(feature = "flowy_unit_test")]
let (revision_ack_notifier, _) = tokio::sync::broadcast::channel(1);
@ -81,7 +106,7 @@ impl RevisionManager {
rev_id_counter,
rev_persistence,
rev_snapshot,
rev_compactor,
rev_compress: rev_compactor,
#[cfg(feature = "flowy_unit_test")]
rev_ack_notifier: revision_ack_notifier,
}
@ -90,7 +115,7 @@ impl RevisionManager {
#[tracing::instrument(level = "debug", skip_all, fields(object_id) err)]
pub async fn load<B>(&mut self, cloud: Option<Arc<dyn RevisionCloudService>>) -> FlowyResult<B::Output>
where
B: RevisionObjectBuilder,
B: RevisionObjectDeserializer,
{
let (revisions, rev_id) = RevisionLoader {
object_id: self.object_id.clone(),
@ -102,7 +127,7 @@ impl RevisionManager {
.await?;
self.rev_id_counter.set(rev_id);
tracing::Span::current().record("object_id", &self.object_id.as_str());
B::build_object(&self.object_id, revisions)
B::deserialize_revisions(&self.object_id, revisions)
}
#[tracing::instrument(level = "debug", skip(self, revisions), err)]
@ -116,7 +141,7 @@ impl RevisionManager {
#[tracing::instrument(level = "debug", skip(self, revision), err)]
pub async fn add_remote_revision(&self, revision: &Revision) -> Result<(), FlowyError> {
if revision.bytes.is_empty() {
return Err(FlowyError::internal().context("Delta data should be empty"));
return Err(FlowyError::internal().context("Remote revisions is empty"));
}
let _ = self.rev_persistence.add_ack_revision(revision).await?;
@ -128,11 +153,11 @@ impl RevisionManager {
#[tracing::instrument(level = "debug", skip_all, err)]
pub async fn add_local_revision(&self, revision: &Revision) -> Result<(), FlowyError> {
if revision.bytes.is_empty() {
return Err(FlowyError::internal().context("Delta data should be empty"));
return Err(FlowyError::internal().context("Local revisions is empty"));
}
let rev_id = self
.rev_persistence
.add_sync_revision(revision, &self.rev_compactor)
.add_sync_revision(revision, &self.rev_compress)
.await?;
// self.rev_history.add_revision(revision).await;
self.rev_id_counter.set(rev_id);

View File

@ -1,10 +1,10 @@
use crate::cache::{
disk::{RevisionChangeset, RevisionDiskCache, SQLiteTextBlockRevisionPersistence},
disk::{RevisionChangeset, RevisionDiskCache, SQLiteDocumentRevisionPersistence},
memory::RevisionMemoryCacheDelegate,
};
use crate::disk::{RevisionRecord, RevisionState, SQLiteGridBlockRevisionPersistence};
use crate::memory::RevisionMemoryCache;
use crate::RevisionCompactor;
use crate::RevisionCompress;
use flowy_database::ConnectionPool;
use flowy_error::{internal_error, FlowyError, FlowyResult};
use flowy_sync::entities::revision::{Revision, RevisionRange};
@ -71,7 +71,7 @@ impl RevisionPersistence {
pub(crate) async fn add_sync_revision<'a>(
&'a self,
revision: &'a Revision,
compactor: &Arc<dyn RevisionCompactor + 'a>,
rev_compress: &Arc<dyn RevisionCompress + 'a>,
) -> FlowyResult<i64> {
let mut sync_seq_write_guard = self.sync_seq.write().await;
let result = sync_seq_write_guard.compact();
@ -93,7 +93,7 @@ impl RevisionPersistence {
revisions.push(revision.clone());
// compact multiple revisions into one
let compact_revision = compactor.compact(&self.user_id, &self.object_id, revisions)?;
let compact_revision = rev_compress.compress_revisions(&self.user_id, &self.object_id, revisions)?;
let rev_id = compact_revision.rev_id;
tracing::Span::current().record("rev_id", &rev_id);
@ -228,7 +228,7 @@ pub fn mk_text_block_revision_disk_cache(
user_id: &str,
pool: Arc<ConnectionPool>,
) -> Arc<dyn RevisionDiskCache<Error = FlowyError>> {
Arc::new(SQLiteTextBlockRevisionPersistence::new(user_id, pool))
Arc::new(SQLiteDocumentRevisionPersistence::new(user_id, pool))
}
pub fn mk_grid_block_revision_disk_cache(

View File

@ -14,7 +14,7 @@ flowy-folder = { path = "../flowy-folder", default-features = false }
flowy-grid = { path = "../flowy-grid", default-features = false }
flowy-grid-data-model = { path = "../../../shared-lib/flowy-grid-data-model" }
flowy-database = { path = "../flowy-database" }
flowy-text-block = { path = "../flowy-text-block", default-features = false }
flowy-document = { path = "../flowy-document", default-features = false }
flowy-revision = { path = "../flowy-revision" }
tracing = { version = "0.1" }
@ -38,8 +38,8 @@ tokio = { version = "1", features = ["full"] }
futures-util = "0.3.15"
[features]
http_sync = ["flowy-folder/cloud_sync", "flowy-text-block/cloud_sync"]
native_sync = ["flowy-folder/cloud_sync", "flowy-text-block/cloud_sync"]
http_sync = ["flowy-folder/cloud_sync", "flowy-document/cloud_sync"]
native_sync = ["flowy-folder/cloud_sync", "flowy-document/cloud_sync"]
use_bunyan = ["lib-log/use_bunyan"]
dart = [
"flowy-user/dart",
@ -47,6 +47,6 @@ dart = [
"flowy-folder/dart",
"flowy-sync/dart",
"flowy-grid/dart",
"flowy-text-block/dart",
"flowy-document/dart",
]
openssl_vendored = ["flowy-database/openssl_vendored"]

View File

@ -1,37 +1,37 @@
use bytes::Bytes;
use flowy_database::ConnectionPool;
use flowy_document::{
errors::{internal_error, FlowyError},
DocumentCloudService, DocumentManager, DocumentUser,
};
use flowy_net::ClientServerConfiguration;
use flowy_net::{
http_server::document::BlockHttpCloudService, local_server::LocalServer, ws::connection::FlowyWebSocketConnect,
http_server::document::DocumentCloudServiceImpl, local_server::LocalServer, ws::connection::FlowyWebSocketConnect,
};
use flowy_revision::{RevisionWebSocket, WSStateReceiver};
use flowy_sync::entities::ws_data::ClientRevisionWSData;
use flowy_text_block::{
errors::{internal_error, FlowyError},
TextEditorCloudService, TextEditorManager, TextEditorUser,
};
use flowy_user::services::UserSession;
use futures_core::future::BoxFuture;
use lib_infra::future::BoxResultFuture;
use lib_ws::{WSChannel, WSMessageReceiver, WebSocketRawMessage};
use std::{convert::TryInto, path::Path, sync::Arc};
pub struct TextBlockDepsResolver();
impl TextBlockDepsResolver {
pub struct DocumentDepsResolver();
impl DocumentDepsResolver {
pub fn resolve(
local_server: Option<Arc<LocalServer>>,
ws_conn: Arc<FlowyWebSocketConnect>,
user_session: Arc<UserSession>,
server_config: &ClientServerConfiguration,
) -> Arc<TextEditorManager> {
) -> Arc<DocumentManager> {
let user = Arc::new(BlockUserImpl(user_session));
let rev_web_socket = Arc::new(TextBlockWebSocket(ws_conn.clone()));
let cloud_service: Arc<dyn TextEditorCloudService> = match local_server {
None => Arc::new(BlockHttpCloudService::new(server_config.clone())),
let rev_web_socket = Arc::new(DocumentRevisionWebSocket(ws_conn.clone()));
let cloud_service: Arc<dyn DocumentCloudService> = match local_server {
None => Arc::new(DocumentCloudServiceImpl::new(server_config.clone())),
Some(local_server) => local_server,
};
let manager = Arc::new(TextEditorManager::new(cloud_service, user, rev_web_socket));
let manager = Arc::new(DocumentManager::new(cloud_service, user, rev_web_socket));
let receiver = Arc::new(DocumentWSMessageReceiverImpl(manager.clone()));
ws_conn.add_ws_message_receiver(receiver).unwrap();
@ -40,7 +40,7 @@ impl TextBlockDepsResolver {
}
struct BlockUserImpl(Arc<UserSession>);
impl TextEditorUser for BlockUserImpl {
impl DocumentUser for BlockUserImpl {
fn user_dir(&self) -> Result<String, FlowyError> {
let dir = self.0.user_dir().map_err(|e| FlowyError::unauthorized().context(e))?;
@ -64,8 +64,8 @@ impl TextEditorUser for BlockUserImpl {
}
}
struct TextBlockWebSocket(Arc<FlowyWebSocketConnect>);
impl RevisionWebSocket for TextBlockWebSocket {
struct DocumentRevisionWebSocket(Arc<FlowyWebSocketConnect>);
impl RevisionWebSocket for DocumentRevisionWebSocket {
fn send(&self, data: ClientRevisionWSData) -> BoxResultFuture<(), FlowyError> {
let bytes: Bytes = data.try_into().unwrap();
let msg = WebSocketRawMessage {
@ -90,7 +90,7 @@ impl RevisionWebSocket for TextBlockWebSocket {
}
}
struct DocumentWSMessageReceiverImpl(Arc<TextEditorManager>);
struct DocumentWSMessageReceiverImpl(Arc<DocumentManager>);
impl WSMessageReceiver for DocumentWSMessageReceiverImpl {
fn source(&self) -> WSChannel {
WSChannel::Document

View File

@ -1,5 +1,6 @@
use bytes::Bytes;
use flowy_database::ConnectionPool;
use flowy_document::DocumentManager;
use flowy_folder::entities::{ViewDataTypePB, ViewLayoutTypePB};
use flowy_folder::manager::{ViewDataProcessor, ViewDataProcessorMap};
use flowy_folder::{
@ -19,7 +20,6 @@ use flowy_revision::{RevisionWebSocket, WSStateReceiver};
use flowy_sync::client_document::default::initial_document_str;
use flowy_sync::entities::revision::{RepeatedRevision, Revision};
use flowy_sync::entities::ws_data::ClientRevisionWSData;
use flowy_text_block::TextEditorManager;
use flowy_user::services::UserSession;
use futures_core::future::BoxFuture;
use lib_infra::future::{BoxResultFuture, FutureResult};
@ -35,12 +35,12 @@ impl FolderDepsResolver {
user_session: Arc<UserSession>,
server_config: &ClientServerConfiguration,
ws_conn: &Arc<FlowyWebSocketConnect>,
text_block_manager: &Arc<TextEditorManager>,
text_block_manager: &Arc<DocumentManager>,
grid_manager: &Arc<GridManager>,
) -> Arc<FolderManager> {
let user: Arc<dyn WorkspaceUser> = Arc::new(WorkspaceUserImpl(user_session.clone()));
let database: Arc<dyn WorkspaceDatabase> = Arc::new(WorkspaceDatabaseImpl(user_session));
let web_socket = Arc::new(FolderWebSocket(ws_conn.clone()));
let web_socket = Arc::new(FolderRevisionWebSocket(ws_conn.clone()));
let cloud_service: Arc<dyn FolderCouldServiceV1> = match local_server {
None => Arc::new(FolderHttpCloudService::new(server_config.clone())),
Some(local_server) => local_server,
@ -64,12 +64,12 @@ impl FolderDepsResolver {
}
fn make_view_data_processor(
text_block_manager: Arc<TextEditorManager>,
text_block_manager: Arc<DocumentManager>,
grid_manager: Arc<GridManager>,
) -> ViewDataProcessorMap {
let mut map: HashMap<ViewDataTypePB, Arc<dyn ViewDataProcessor + Send + Sync>> = HashMap::new();
let block_data_impl = TextBlockViewDataProcessor(text_block_manager);
let block_data_impl = DocumentViewDataProcessor(text_block_manager);
map.insert(block_data_impl.data_type(), Arc::new(block_data_impl));
let grid_data_impl = GridViewDataProcessor(grid_manager);
@ -96,8 +96,8 @@ impl WorkspaceUser for WorkspaceUserImpl {
}
}
struct FolderWebSocket(Arc<FlowyWebSocketConnect>);
impl RevisionWebSocket for FolderWebSocket {
struct FolderRevisionWebSocket(Arc<FlowyWebSocketConnect>);
impl RevisionWebSocket for FolderRevisionWebSocket {
fn send(&self, data: ClientRevisionWSData) -> BoxResultFuture<(), FlowyError> {
let bytes: Bytes = data.try_into().unwrap();
let msg = WebSocketRawMessage {
@ -136,8 +136,8 @@ impl WSMessageReceiver for FolderWSMessageReceiverImpl {
}
}
struct TextBlockViewDataProcessor(Arc<TextEditorManager>);
impl ViewDataProcessor for TextBlockViewDataProcessor {
struct DocumentViewDataProcessor(Arc<DocumentManager>);
impl ViewDataProcessor for DocumentViewDataProcessor {
fn initialize(&self) -> FutureResult<(), FlowyError> {
let manager = self.0.clone();
FutureResult::new(async move { manager.init() })
@ -156,7 +156,7 @@ impl ViewDataProcessor for TextBlockViewDataProcessor {
let view_id = view_id.to_string();
let manager = self.0.clone();
FutureResult::new(async move {
let _ = manager.create_text_block(view_id, repeated_revision).await?;
let _ = manager.create_document(view_id, repeated_revision).await?;
Ok(())
})
}
@ -165,7 +165,7 @@ impl ViewDataProcessor for TextBlockViewDataProcessor {
let manager = self.0.clone();
let view_id = view_id.to_string();
FutureResult::new(async move {
let _ = manager.close_text_editor(view_id)?;
let _ = manager.close_document_editor(view_id)?;
Ok(())
})
}
@ -174,8 +174,8 @@ impl ViewDataProcessor for TextBlockViewDataProcessor {
let view_id = view_id.to_string();
let manager = self.0.clone();
FutureResult::new(async move {
let editor = manager.open_text_editor(view_id).await?;
let delta_bytes = Bytes::from(editor.delta_str().await?);
let editor = manager.open_document_editor(view_id).await?;
let delta_bytes = Bytes::from(editor.get_operation_str().await?);
Ok(delta_bytes)
})
}
@ -195,7 +195,7 @@ impl ViewDataProcessor for TextBlockViewDataProcessor {
let delta_data = Bytes::from(view_data);
let repeated_revision: RepeatedRevision =
Revision::initial_revision(&user_id, &view_id, delta_data.clone()).into();
let _ = manager.create_text_block(view_id, repeated_revision).await?;
let _ = manager.create_document(view_id, repeated_revision).await?;
Ok(delta_data)
})
}

View File

@ -18,7 +18,7 @@ pub struct GridDepsResolver();
impl GridDepsResolver {
pub async fn resolve(ws_conn: Arc<FlowyWebSocketConnect>, user_session: Arc<UserSession>) -> Arc<GridManager> {
let user = Arc::new(GridUserImpl(user_session.clone()));
let rev_web_socket = Arc::new(GridWebSocket(ws_conn));
let rev_web_socket = Arc::new(GridRevisionWebSocket(ws_conn));
let grid_manager = Arc::new(GridManager::new(
user.clone(),
rev_web_socket,
@ -58,8 +58,8 @@ impl GridUser for GridUserImpl {
}
}
struct GridWebSocket(Arc<FlowyWebSocketConnect>);
impl RevisionWebSocket for GridWebSocket {
struct GridRevisionWebSocket(Arc<FlowyWebSocketConnect>);
impl RevisionWebSocket for GridRevisionWebSocket {
fn send(&self, data: ClientRevisionWSData) -> BoxResultFuture<(), FlowyError> {
let bytes: Bytes = data.try_into().unwrap();
let msg = WebSocketRawMessage {

View File

@ -1,10 +1,10 @@
mod document_deps;
mod folder_deps;
mod grid_deps;
mod text_block_deps;
mod user_deps;
mod util;
pub use document_deps::*;
pub use folder_deps::*;
pub use grid_deps::*;
pub use text_block_deps::*;
pub use user_deps::*;

View File

@ -3,6 +3,7 @@ pub mod module;
pub use flowy_net::get_client_server_configuration;
use crate::deps_resolve::*;
use flowy_document::DocumentManager;
use flowy_folder::{errors::FlowyError, manager::FolderManager};
use flowy_grid::manager::GridManager;
use flowy_net::ClientServerConfiguration;
@ -11,7 +12,6 @@ use flowy_net::{
local_server::LocalServer,
ws::connection::{listen_on_websocket, FlowyWebSocketConnect},
};
use flowy_text_block::TextEditorManager;
use flowy_user::services::{notifier::UserStatus, UserSession, UserSessionConfig};
use lib_dispatch::prelude::*;
use lib_dispatch::runtime::tokio_default_runtime;
@ -67,7 +67,7 @@ fn crate_log_filter(level: String) -> String {
filters.push(format!("flowy_sdk={}", level));
filters.push(format!("flowy_folder={}", level));
filters.push(format!("flowy_user={}", level));
filters.push(format!("flowy_text_block={}", level));
filters.push(format!("flowy_document={}", level));
filters.push(format!("flowy_grid={}", level));
filters.push(format!("flowy_collaboration={}", "info"));
filters.push(format!("dart_notify={}", level));
@ -89,7 +89,7 @@ pub struct FlowySDK {
#[allow(dead_code)]
config: FlowySDKConfig,
pub user_session: Arc<UserSession>,
pub text_block_manager: Arc<TextEditorManager>,
pub text_block_manager: Arc<DocumentManager>,
pub folder_manager: Arc<FolderManager>,
pub grid_manager: Arc<GridManager>,
pub dispatcher: Arc<EventDispatcher>,
@ -106,7 +106,7 @@ impl FlowySDK {
let (local_server, ws_conn) = mk_local_server(&config.server_config);
let (user_session, text_block_manager, folder_manager, local_server, grid_manager) = runtime.block_on(async {
let user_session = mk_user_session(&config, &local_server, &config.server_config);
let text_block_manager = TextBlockDepsResolver::resolve(
let text_block_manager = DocumentDepsResolver::resolve(
local_server.clone(),
ws_conn.clone(),
user_session.clone(),

View File

@ -1,7 +1,7 @@
use flowy_document::DocumentManager;
use flowy_folder::manager::FolderManager;
use flowy_grid::manager::GridManager;
use flowy_net::ws::connection::FlowyWebSocketConnect;
use flowy_text_block::TextEditorManager;
use flowy_user::services::UserSession;
use lib_dispatch::prelude::Module;
use std::sync::Arc;
@ -11,7 +11,7 @@ pub fn mk_modules(
folder_manager: &Arc<FolderManager>,
grid_manager: &Arc<GridManager>,
user_session: &Arc<UserSession>,
text_block_manager: &Arc<TextEditorManager>,
text_block_manager: &Arc<DocumentManager>,
) -> Vec<Module> {
let user_module = mk_user_module(user_session.clone());
let folder_module = mk_folder_module(folder_manager.clone());
@ -43,6 +43,6 @@ fn mk_grid_module(grid_manager: Arc<GridManager>) -> Module {
flowy_grid::event_map::create(grid_manager)
}
fn mk_text_block_module(text_block_manager: Arc<TextEditorManager>) -> Module {
flowy_text_block::event_map::create(text_block_manager)
fn mk_text_block_module(text_block_manager: Arc<DocumentManager>) -> Module {
flowy_document::event_map::create(text_block_manager)
}

View File

@ -1,43 +0,0 @@
use crate::entities::{EditParams, EditPayloadPB, ExportDataPB, ExportParams, ExportPayloadPB, TextBlockPB};
use crate::TextEditorManager;
use flowy_error::FlowyError;
use flowy_sync::entities::text_block::TextBlockIdPB;
use lib_dispatch::prelude::{data_result, AppData, Data, DataResult};
use std::convert::TryInto;
use std::sync::Arc;
pub(crate) async fn get_text_block_handler(
data: Data<TextBlockIdPB>,
manager: AppData<Arc<TextEditorManager>>,
) -> DataResult<TextBlockPB, FlowyError> {
let text_block_id: TextBlockIdPB = data.into_inner();
let editor = manager.open_text_editor(&text_block_id).await?;
let delta_str = editor.delta_str().await?;
data_result(TextBlockPB {
text_block_id: text_block_id.into(),
snapshot: delta_str,
})
}
pub(crate) async fn apply_edit_handler(
data: Data<EditPayloadPB>,
manager: AppData<Arc<TextEditorManager>>,
) -> Result<(), FlowyError> {
let params: EditParams = data.into_inner().try_into()?;
let _ = manager.apply_edit(params).await?;
Ok(())
}
#[tracing::instrument(level = "debug", skip(data, manager), err)]
pub(crate) async fn export_handler(
data: Data<ExportPayloadPB>,
manager: AppData<Arc<TextEditorManager>>,
) -> DataResult<ExportDataPB, FlowyError> {
let params: ExportParams = data.into_inner().try_into()?;
let editor = manager.open_text_editor(&params.view_id).await?;
let delta_json = editor.delta_str().await?;
data_result(ExportDataPB {
data: delta_json,
export_type: params.export_type,
})
}

View File

@ -1,27 +0,0 @@
pub mod editor;
mod entities;
mod event_handler;
pub mod event_map;
pub mod manager;
mod queue;
mod web_socket;
pub mod protobuf;
pub use manager::*;
pub mod errors {
pub use flowy_error::{internal_error, ErrorCode, FlowyError};
}
pub const TEXT_BLOCK_SYNC_INTERVAL_IN_MILLIS: u64 = 1000;
use crate::errors::FlowyError;
use flowy_sync::entities::text_block::{CreateTextBlockParams, DocumentPB, ResetTextBlockParams, TextBlockIdPB};
use lib_infra::future::FutureResult;
pub trait TextEditorCloudService: Send + Sync {
fn create_text_block(&self, token: &str, params: CreateTextBlockParams) -> FutureResult<(), FlowyError>;
fn read_text_block(&self, token: &str, params: TextBlockIdPB) -> FutureResult<Option<DocumentPB>, FlowyError>;
fn update_text_block(&self, token: &str, params: ResetTextBlockParams) -> FutureResult<(), FlowyError>;
}

View File

@ -10,19 +10,19 @@ use bytes::Bytes;
use lib_ot::{core::*, text_delta::TextOperations};
use tokio::sync::mpsc;
pub trait InitialDocumentContent {
pub trait InitialDocument {
fn json_str() -> String;
}
pub struct EmptyDoc();
impl InitialDocumentContent for EmptyDoc {
impl InitialDocument for EmptyDoc {
fn json_str() -> String {
TextOperations::default().json_str()
}
}
pub struct NewlineDoc();
impl InitialDocumentContent for NewlineDoc {
impl InitialDocument for NewlineDoc {
fn json_str() -> String {
initial_document_str()
}
@ -37,7 +37,7 @@ pub struct ClientDocument {
}
impl ClientDocument {
pub fn new<C: InitialDocumentContent>() -> Self {
pub fn new<C: InitialDocument>() -> Self {
let content = C::json_str();
Self::from_json(&content).unwrap()
}

View File

@ -3,24 +3,24 @@ use crate::{
errors::CollaborateError,
};
use flowy_derive::ProtoBuf;
use lib_ot::{errors::OTError, text_delta::TextOperations};
use lib_ot::text_delta::TextOperations;
#[derive(ProtoBuf, Default, Debug, Clone)]
pub struct CreateTextBlockParams {
pub struct CreateDocumentParams {
#[pb(index = 1)]
pub id: String,
pub doc_id: String,
#[pb(index = 2)]
pub revisions: RepeatedRevision,
}
#[derive(ProtoBuf, Default, Debug, Clone, Eq, PartialEq)]
pub struct DocumentPB {
pub struct DocumentPayloadPB {
#[pb(index = 1)]
pub block_id: String,
pub doc_id: String,
#[pb(index = 2)]
pub text: String,
pub content: String,
#[pb(index = 3)]
pub rev_id: i64,
@ -29,14 +29,7 @@ pub struct DocumentPB {
pub base_rev_id: i64,
}
impl DocumentPB {
pub fn delta(&self) -> Result<TextOperations, OTError> {
let delta = TextOperations::from_bytes(&self.text)?;
Ok(delta)
}
}
impl std::convert::TryFrom<Revision> for DocumentPB {
impl std::convert::TryFrom<Revision> for DocumentPayloadPB {
type Error = CollaborateError;
fn try_from(revision: Revision) -> Result<Self, Self::Error> {
@ -48,9 +41,9 @@ impl std::convert::TryFrom<Revision> for DocumentPB {
let delta = TextOperations::from_bytes(&revision.bytes)?;
let doc_json = delta.json_str();
Ok(DocumentPB {
block_id: revision.object_id,
text: doc_json,
Ok(DocumentPayloadPB {
doc_id: revision.object_id,
content: doc_json,
rev_id: revision.rev_id,
base_rev_id: revision.base_rev_id,
})
@ -58,21 +51,21 @@ impl std::convert::TryFrom<Revision> for DocumentPB {
}
#[derive(ProtoBuf, Default, Debug, Clone)]
pub struct ResetTextBlockParams {
pub struct ResetDocumentParams {
#[pb(index = 1)]
pub block_id: String,
pub doc_id: String,
#[pb(index = 2)]
pub revisions: RepeatedRevision,
}
#[derive(ProtoBuf, Default, Debug, Clone)]
pub struct TextBlockDeltaPB {
pub struct DocumentOperationsPB {
#[pb(index = 1)]
pub text_block_id: String,
pub doc_id: String,
#[pb(index = 2)]
pub delta_str: String,
pub operations_str: String,
}
#[derive(ProtoBuf, Default, Debug, Clone)]
@ -88,30 +81,30 @@ pub struct NewDocUserPB {
}
#[derive(ProtoBuf, Default, Debug, Clone)]
pub struct TextBlockIdPB {
pub struct DocumentIdPB {
#[pb(index = 1)]
pub value: String,
}
impl AsRef<str> for TextBlockIdPB {
impl AsRef<str> for DocumentIdPB {
fn as_ref(&self) -> &str {
&self.value
}
}
impl std::convert::From<String> for TextBlockIdPB {
impl std::convert::From<String> for DocumentIdPB {
fn from(value: String) -> Self {
TextBlockIdPB { value }
DocumentIdPB { value }
}
}
impl std::convert::From<TextBlockIdPB> for String {
fn from(block_id: TextBlockIdPB) -> Self {
impl std::convert::From<DocumentIdPB> for String {
fn from(block_id: DocumentIdPB) -> Self {
block_id.value
}
}
impl std::convert::From<&String> for TextBlockIdPB {
impl std::convert::From<&String> for DocumentIdPB {
fn from(s: &String) -> Self {
TextBlockIdPB { value: s.to_owned() }
DocumentIdPB { value: s.to_owned() }
}
}

View File

@ -1,5 +1,5 @@
pub mod document;
pub mod folder;
pub mod parser;
pub mod revision;
pub mod text_block;
pub mod ws_data;

View File

@ -1,6 +1,6 @@
use crate::entities::revision::{RepeatedRevision, Revision};
use crate::{
entities::{text_block::DocumentPB, ws_data::ServerRevisionWSDataBuilder},
entities::{document::DocumentPayloadPB, ws_data::ServerRevisionWSDataBuilder},
errors::{internal_error, CollaborateError, CollaborateResult},
protobuf::ClientRevisionWSData,
server_document::document_pad::ServerDocument,
@ -19,41 +19,41 @@ use tokio::{
task::spawn_blocking,
};
pub trait TextBlockCloudPersistence: Send + Sync + Debug {
fn read_text_block(&self, doc_id: &str) -> BoxResultFuture<DocumentPB, CollaborateError>;
pub trait DocumentCloudPersistence: Send + Sync + Debug {
fn read_document(&self, doc_id: &str) -> BoxResultFuture<DocumentPayloadPB, CollaborateError>;
fn create_text_block(
fn create_document(
&self,
doc_id: &str,
repeated_revision: RepeatedRevision,
) -> BoxResultFuture<Option<DocumentPB>, CollaborateError>;
) -> BoxResultFuture<Option<DocumentPayloadPB>, CollaborateError>;
fn read_text_block_revisions(
fn read_document_revisions(
&self,
doc_id: &str,
rev_ids: Option<Vec<i64>>,
) -> BoxResultFuture<Vec<Revision>, CollaborateError>;
fn save_text_block_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError>;
fn save_document_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError>;
fn reset_text_block(
fn reset_document(
&self,
doc_id: &str,
repeated_revision: RepeatedRevision,
) -> BoxResultFuture<(), CollaborateError>;
}
impl RevisionSyncPersistence for Arc<dyn TextBlockCloudPersistence> {
impl RevisionSyncPersistence for Arc<dyn DocumentCloudPersistence> {
fn read_revisions(
&self,
object_id: &str,
rev_ids: Option<Vec<i64>>,
) -> BoxResultFuture<Vec<Revision>, CollaborateError> {
(**self).read_text_block_revisions(object_id, rev_ids)
(**self).read_document_revisions(object_id, rev_ids)
}
fn save_revisions(&self, repeated_revision: RepeatedRevision) -> BoxResultFuture<(), CollaborateError> {
(**self).save_text_block_revisions(repeated_revision)
(**self).save_document_revisions(repeated_revision)
}
fn reset_object(
@ -61,17 +61,17 @@ impl RevisionSyncPersistence for Arc<dyn TextBlockCloudPersistence> {
object_id: &str,
repeated_revision: RepeatedRevision,
) -> BoxResultFuture<(), CollaborateError> {
(**self).reset_text_block(object_id, repeated_revision)
(**self).reset_document(object_id, repeated_revision)
}
}
pub struct ServerDocumentManager {
document_handlers: Arc<RwLock<HashMap<String, Arc<OpenDocumentHandler>>>>,
persistence: Arc<dyn TextBlockCloudPersistence>,
persistence: Arc<dyn DocumentCloudPersistence>,
}
impl ServerDocumentManager {
pub fn new(persistence: Arc<dyn TextBlockCloudPersistence>) -> Self {
pub fn new(persistence: Arc<dyn DocumentCloudPersistence>) -> Self {
Self {
document_handlers: Arc::new(RwLock::new(HashMap::new())),
persistence,
@ -154,7 +154,7 @@ impl ServerDocumentManager {
}
let mut write_guard = self.document_handlers.write().await;
match self.persistence.read_text_block(doc_id).await {
match self.persistence.read_document(doc_id).await {
Ok(doc) => {
let handler = self.create_document_handler(doc).await.map_err(internal_error).unwrap();
write_guard.insert(doc_id.to_owned(), handler.clone());
@ -170,7 +170,7 @@ impl ServerDocumentManager {
doc_id: &str,
repeated_revision: RepeatedRevision,
) -> Result<Arc<OpenDocumentHandler>, CollaborateError> {
match self.persistence.create_text_block(doc_id, repeated_revision).await? {
match self.persistence.create_document(doc_id, repeated_revision).await? {
None => Err(CollaborateError::internal().context("Create document info from revisions failed")),
Some(doc) => {
let handler = self.create_document_handler(doc).await?;
@ -184,7 +184,10 @@ impl ServerDocumentManager {
}
#[tracing::instrument(level = "debug", skip(self, doc), err)]
async fn create_document_handler(&self, doc: DocumentPB) -> Result<Arc<OpenDocumentHandler>, CollaborateError> {
async fn create_document_handler(
&self,
doc: DocumentPayloadPB,
) -> Result<Arc<OpenDocumentHandler>, CollaborateError> {
let persistence = self.persistence.clone();
let handle = spawn_blocking(|| OpenDocumentHandler::new(doc, persistence))
.await
@ -208,16 +211,16 @@ struct OpenDocumentHandler {
}
impl OpenDocumentHandler {
fn new(doc: DocumentPB, persistence: Arc<dyn TextBlockCloudPersistence>) -> Result<Self, CollaborateError> {
let doc_id = doc.block_id.clone();
fn new(doc: DocumentPayloadPB, persistence: Arc<dyn DocumentCloudPersistence>) -> Result<Self, CollaborateError> {
let doc_id = doc.doc_id.clone();
let (sender, receiver) = mpsc::channel(1000);
let users = DashMap::new();
let operations = TextOperations::from_bytes(&doc.text)?;
let operations = TextOperations::from_bytes(&doc.content)?;
let sync_object = ServerDocument::from_operations(&doc_id, operations);
let synchronizer = Arc::new(DocumentRevisionSynchronizer::new(doc.rev_id, sync_object, persistence));
let queue = DocumentCommandRunner::new(&doc.block_id, receiver, synchronizer);
let queue = DocumentCommandRunner::new(&doc.doc_id, receiver, synchronizer);
tokio::task::spawn(queue.run());
Ok(Self { doc_id, sender, users })
}

View File

@ -1,5 +1,5 @@
use crate::synchronizer::RevisionOperations;
use crate::{client_document::InitialDocumentContent, errors::CollaborateError, synchronizer::RevisionSyncObject};
use crate::{client_document::InitialDocument, errors::CollaborateError, synchronizer::RevisionSyncObject};
use lib_ot::{core::*, text_delta::TextOperations};
pub struct ServerDocument {
@ -9,7 +9,7 @@ pub struct ServerDocument {
impl ServerDocument {
#[allow(dead_code)]
pub fn new<C: InitialDocumentContent>(doc_id: &str) -> Self {
pub fn new<C: InitialDocument>(doc_id: &str) -> Self {
let operations = TextOperations::from_json(&C::json_str()).unwrap();
Self::from_operations(doc_id, operations)
}

View File

@ -1,9 +1,9 @@
use crate::server_folder::FolderOperations;
use crate::{
entities::{
document::DocumentPayloadPB,
folder::FolderInfo,
revision::{RepeatedRevision, Revision},
text_block::DocumentPB,
},
errors::{CollaborateError, CollaborateResult},
};
@ -149,7 +149,7 @@ pub fn make_folder_from_revisions_pb(
pub fn make_document_from_revision_pbs(
doc_id: &str,
revisions: RepeatedRevision,
) -> Result<Option<DocumentPB>, CollaborateError> {
) -> Result<Option<DocumentPayloadPB>, CollaborateError> {
let revisions = revisions.into_inner();
if revisions.is_empty() {
return Ok(None);
@ -172,9 +172,9 @@ pub fn make_document_from_revision_pbs(
let text = delta.json_str();
Ok(Some(DocumentPB {
block_id: doc_id.to_owned(),
text,
Ok(Some(DocumentPayloadPB {
doc_id: doc_id.to_owned(),
content: text,
rev_id,
base_rev_id,
}))

View File

@ -8,8 +8,8 @@ edition = "2018"
[dependencies]
bytecount = "0.6.0"
serde = { version = "1.0", features = ["derive", "rc"] }
#flowy-revision = { path = "../../frontend/rust-lib/flowy-revision" }
#protobuf = {version = "2.18.0"}
#flowy-derive = { path = "../flowy-derive" }
tokio = { version = "1", features = ["sync"] }
dashmap = "5"
md5 = "0.7.0"

View File

@ -1,5 +1,6 @@
use crate::core::delta::operation::OperationAttributes;
use crate::core::delta::DeltaOperations;
use serde::de::DeserializeOwned;
use serde::{
de::{SeqAccess, Visitor},
ser::SerializeSeq,