From 23f4684d3f361cd8727398b8d8e4acb4cebfeb1a Mon Sep 17 00:00:00 2001 From: appflowy Date: Tue, 7 Dec 2021 22:32:34 +0800 Subject: [PATCH] add revision cache layer --- backend/Cargo.lock | 5 + backend/src/services/doc/edit/edit_actor.rs | 3 +- backend/src/services/doc/edit/editor.rs | 8 +- backend/src/services/doc/manager.rs | 3 +- backend/src/services/doc/ws_actor.rs | 3 +- .../flowy-document-infra/protobuf.dart | 1 - .../revision.pb.dart => lib-ot/model.pb.dart} | 100 ++--- .../model.pbenum.dart} | 2 +- .../model.pbjson.dart} | 22 +- .../model.pbserver.dart} | 4 +- .../lib/protobuf/lib-ot/protobuf.dart | 2 + .../src/services/doc/edit/editor.rs | 3 +- .../src/services/doc/edit/model.rs | 3 +- .../src/services/doc/edit/queue.rs | 2 +- .../src/services/doc/revision/manager.rs | 11 +- .../src/services/doc/revision/model.rs | 41 +- .../src/services/doc/revision/persistence.rs | 22 +- .../src/sql_tables/doc/rev_sql.rs | 6 +- .../src/sql_tables/doc/rev_table.rs | 36 +- shared-lib/Cargo.lock | 5 + .../src/derive_cache/derive_cache.rs | 8 +- .../src/entities/doc/mod.rs | 2 - .../src/entities/ws/ws.rs | 6 +- .../src/protobuf/model/mod.rs | 3 - shared-lib/lib-ot/Cargo.toml | 6 + shared-lib/lib-ot/Flowy.toml | 3 + shared-lib/lib-ot/src/errors.rs | 26 +- shared-lib/lib-ot/src/lib.rs | 2 + shared-lib/lib-ot/src/protobuf/mod.rs | 4 + shared-lib/lib-ot/src/protobuf/model/mod.rs | 5 + .../src/protobuf/model/model.rs} | 399 +++++++++--------- .../src/protobuf/proto/model.proto} | 6 +- shared-lib/lib-ot/src/revision/cache.rs | 109 +++++ shared-lib/lib-ot/src/revision/mod.rs | 5 + .../src/revision/model.rs} | 100 ++--- 35 files changed, 549 insertions(+), 417 deletions(-) rename frontend/app_flowy/packages/flowy_sdk/lib/protobuf/{flowy-document-infra/revision.pb.dart => lib-ot/model.pb.dart} (99%) rename frontend/app_flowy/packages/flowy_sdk/lib/protobuf/{flowy-document-infra/revision.pbenum.dart => lib-ot/model.pbenum.dart} (97%) rename frontend/app_flowy/packages/flowy_sdk/lib/protobuf/{flowy-document-infra/revision.pbjson.dart => lib-ot/model.pbjson.dart} (99%) rename frontend/app_flowy/packages/flowy_sdk/lib/protobuf/{flowy-document-infra/revision.pbserver.dart => lib-ot/model.pbserver.dart} (85%) create mode 100644 frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/protobuf.dart create mode 100644 shared-lib/lib-ot/Flowy.toml create mode 100644 shared-lib/lib-ot/src/protobuf/mod.rs create mode 100644 shared-lib/lib-ot/src/protobuf/model/mod.rs rename shared-lib/{flowy-document-infra/src/protobuf/model/revision.rs => lib-ot/src/protobuf/model/model.rs} (87%) rename shared-lib/{flowy-document-infra/src/protobuf/proto/revision.proto => lib-ot/src/protobuf/proto/model.proto} (100%) create mode 100644 shared-lib/lib-ot/src/revision/cache.rs create mode 100644 shared-lib/lib-ot/src/revision/mod.rs rename shared-lib/{flowy-document-infra/src/entities/doc/revision.rs => lib-ot/src/revision/model.rs} (79%) diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 7efe8c9b83..2b3c3bd2c3 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -1972,13 +1972,18 @@ version = "0.1.0" dependencies = [ "bytecount", "bytes", + "dashmap", "derive_more", + "flowy-derive", "lazy_static", "log", + "md5", + "protobuf", "serde", "serde_json", "strum", "strum_macros", + "tokio", "tracing", ] diff --git a/backend/src/services/doc/edit/edit_actor.rs b/backend/src/services/doc/edit/edit_actor.rs index 985bf49d00..f73dfa0237 100644 --- a/backend/src/services/doc/edit/edit_actor.rs +++ b/backend/src/services/doc/edit/edit_actor.rs @@ -5,8 +5,9 @@ use crate::{ use actix_web::web::Data; use async_stream::stream; use backend_service::errors::{internal_error, Result as DocResult, ServerError}; -use flowy_document_infra::protobuf::{Doc, Revision}; +use flowy_document_infra::protobuf::Doc; use futures::stream::StreamExt; +use lib_ot::protobuf::Revision; use sqlx::PgPool; use std::sync::{atomic::Ordering::SeqCst, Arc}; use tokio::{ diff --git a/backend/src/services/doc/edit/editor.rs b/backend/src/services/doc/edit/editor.rs index 49b0929769..96b10c710d 100644 --- a/backend/src/services/doc/edit/editor.rs +++ b/backend/src/services/doc/edit/editor.rs @@ -11,9 +11,13 @@ use dashmap::DashMap; use flowy_document_infra::{ core::Document, entities::ws::{WsDataType, WsDocumentData}, - protobuf::{Doc, RevId, RevType, Revision, RevisionRange, UpdateDocParams}, + protobuf::{Doc, UpdateDocParams}, +}; +use lib_ot::{ + core::OperationTransformable, + protobuf::{RevId, RevType, Revision, RevisionRange}, + rich_text::RichTextDelta, }; -use lib_ot::{core::OperationTransformable, rich_text::RichTextDelta}; use parking_lot::RwLock; use protobuf::Message; use sqlx::PgPool; diff --git a/backend/src/services/doc/manager.rs b/backend/src/services/doc/manager.rs index 960edd5e8e..f0de848da1 100644 --- a/backend/src/services/doc/manager.rs +++ b/backend/src/services/doc/manager.rs @@ -9,7 +9,8 @@ use crate::{ use actix_web::web::Data; use backend_service::errors::{internal_error, Result as DocResult, ServerError}; use dashmap::DashMap; -use flowy_document_infra::protobuf::{Doc, DocIdentifier, Revision}; +use flowy_document_infra::protobuf::{Doc, DocIdentifier}; +use lib_ot::protobuf::Revision; use sqlx::PgPool; use std::sync::Arc; use tokio::{ diff --git a/backend/src/services/doc/ws_actor.rs b/backend/src/services/doc/ws_actor.rs index 0ba431fff6..1a8fa60de0 100644 --- a/backend/src/services/doc/ws_actor.rs +++ b/backend/src/services/doc/ws_actor.rs @@ -9,8 +9,9 @@ use actix_rt::task::spawn_blocking; use actix_web::web::Data; use async_stream::stream; use backend_service::errors::{internal_error, Result as DocResult, ServerError}; -use flowy_document_infra::protobuf::{NewDocUser, Revision, WsDataType, WsDocumentData}; +use flowy_document_infra::protobuf::{NewDocUser, WsDataType, WsDocumentData}; use futures::stream::StreamExt; +use lib_ot::protobuf::Revision; use sqlx::PgPool; use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document-infra/protobuf.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document-infra/protobuf.dart index 7626c5708b..bba338f3b8 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document-infra/protobuf.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document-infra/protobuf.dart @@ -1,4 +1,3 @@ // Auto-generated, do not edit export './ws.pb.dart'; -export './revision.pb.dart'; export './doc.pb.dart'; diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document-infra/revision.pb.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/model.pb.dart similarity index 99% rename from frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document-infra/revision.pb.dart rename to frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/model.pb.dart index 979820558d..a19ab537be 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document-infra/revision.pb.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/model.pb.dart @@ -1,6 +1,6 @@ /// // Generated code. Do not modify. -// source: revision.proto +// source: model.proto // // @dart = 2.12 // ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields @@ -10,56 +10,9 @@ import 'dart:core' as $core; import 'package:fixnum/fixnum.dart' as $fixnum; import 'package:protobuf/protobuf.dart' as $pb; -import 'revision.pbenum.dart'; +import 'model.pbenum.dart'; -export 'revision.pbenum.dart'; - -class RevId extends $pb.GeneratedMessage { - static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'RevId', createEmptyInstance: create) - ..aInt64(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'value') - ..hasRequiredFields = false - ; - - RevId._() : super(); - factory RevId({ - $fixnum.Int64? value, - }) { - final _result = create(); - if (value != null) { - _result.value = value; - } - return _result; - } - factory RevId.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r); - factory RevId.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r); - @$core.Deprecated( - 'Using this can add significant overhead to your binary. ' - 'Use [GeneratedMessageGenericExtensions.deepCopy] instead. ' - 'Will be removed in next major version') - RevId clone() => RevId()..mergeFromMessage(this); - @$core.Deprecated( - 'Using this can add significant overhead to your binary. ' - 'Use [GeneratedMessageGenericExtensions.rebuild] instead. ' - 'Will be removed in next major version') - RevId copyWith(void Function(RevId) updates) => super.copyWith((message) => updates(message as RevId)) as RevId; // ignore: deprecated_member_use - $pb.BuilderInfo get info_ => _i; - @$core.pragma('dart2js:noInline') - static RevId create() => RevId._(); - RevId createEmptyInstance() => create(); - static $pb.PbList createRepeated() => $pb.PbList(); - @$core.pragma('dart2js:noInline') - static RevId getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor(create); - static RevId? _defaultInstance; - - @$pb.TagNumber(1) - $fixnum.Int64 get value => $_getI64(0); - @$pb.TagNumber(1) - set value($fixnum.Int64 v) { $_setInt64(0, v); } - @$pb.TagNumber(1) - $core.bool hasValue() => $_has(0); - @$pb.TagNumber(1) - void clearValue() => clearField(1); -} +export 'model.pbenum.dart'; class Revision extends $pb.GeneratedMessage { static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'Revision', createEmptyInstance: create) @@ -178,6 +131,53 @@ class Revision extends $pb.GeneratedMessage { void clearTy() => clearField(6); } +class RevId extends $pb.GeneratedMessage { + static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'RevId', createEmptyInstance: create) + ..aInt64(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'value') + ..hasRequiredFields = false + ; + + RevId._() : super(); + factory RevId({ + $fixnum.Int64? value, + }) { + final _result = create(); + if (value != null) { + _result.value = value; + } + return _result; + } + factory RevId.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r); + factory RevId.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r); + @$core.Deprecated( + 'Using this can add significant overhead to your binary. ' + 'Use [GeneratedMessageGenericExtensions.deepCopy] instead. ' + 'Will be removed in next major version') + RevId clone() => RevId()..mergeFromMessage(this); + @$core.Deprecated( + 'Using this can add significant overhead to your binary. ' + 'Use [GeneratedMessageGenericExtensions.rebuild] instead. ' + 'Will be removed in next major version') + RevId copyWith(void Function(RevId) updates) => super.copyWith((message) => updates(message as RevId)) as RevId; // ignore: deprecated_member_use + $pb.BuilderInfo get info_ => _i; + @$core.pragma('dart2js:noInline') + static RevId create() => RevId._(); + RevId createEmptyInstance() => create(); + static $pb.PbList createRepeated() => $pb.PbList(); + @$core.pragma('dart2js:noInline') + static RevId getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor(create); + static RevId? _defaultInstance; + + @$pb.TagNumber(1) + $fixnum.Int64 get value => $_getI64(0); + @$pb.TagNumber(1) + set value($fixnum.Int64 v) { $_setInt64(0, v); } + @$pb.TagNumber(1) + $core.bool hasValue() => $_has(0); + @$pb.TagNumber(1) + void clearValue() => clearField(1); +} + class RevisionRange extends $pb.GeneratedMessage { static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'RevisionRange', createEmptyInstance: create) ..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'docId') diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document-infra/revision.pbenum.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/model.pbenum.dart similarity index 97% rename from frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document-infra/revision.pbenum.dart rename to frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/model.pbenum.dart index 0921ea3229..cd22cc9f89 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document-infra/revision.pbenum.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/model.pbenum.dart @@ -1,6 +1,6 @@ /// // Generated code. Do not modify. -// source: revision.proto +// source: model.proto // // @dart = 2.12 // ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document-infra/revision.pbjson.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/model.pbjson.dart similarity index 99% rename from frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document-infra/revision.pbjson.dart rename to frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/model.pbjson.dart index 08933d02c7..8b52c2439e 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document-infra/revision.pbjson.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/model.pbjson.dart @@ -1,6 +1,6 @@ /// // Generated code. Do not modify. -// source: revision.proto +// source: model.proto // // @dart = 2.12 // ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields,deprecated_member_use_from_same_package @@ -19,16 +19,6 @@ const RevType$json = const { /// Descriptor for `RevType`. Decode as a `google.protobuf.EnumDescriptorProto`. final $typed_data.Uint8List revTypeDescriptor = $convert.base64Decode('CgdSZXZUeXBlEgkKBUxvY2FsEAASCgoGUmVtb3RlEAE='); -@$core.Deprecated('Use revIdDescriptor instead') -const RevId$json = const { - '1': 'RevId', - '2': const [ - const {'1': 'value', '3': 1, '4': 1, '5': 3, '10': 'value'}, - ], -}; - -/// Descriptor for `RevId`. Decode as a `google.protobuf.DescriptorProto`. -final $typed_data.Uint8List revIdDescriptor = $convert.base64Decode('CgVSZXZJZBIUCgV2YWx1ZRgBIAEoA1IFdmFsdWU='); @$core.Deprecated('Use revisionDescriptor instead') const Revision$json = const { '1': 'Revision', @@ -44,6 +34,16 @@ const Revision$json = const { /// Descriptor for `Revision`. Decode as a `google.protobuf.DescriptorProto`. final $typed_data.Uint8List revisionDescriptor = $convert.base64Decode('CghSZXZpc2lvbhIeCgtiYXNlX3Jldl9pZBgBIAEoA1IJYmFzZVJldklkEhUKBnJldl9pZBgCIAEoA1IFcmV2SWQSHQoKZGVsdGFfZGF0YRgDIAEoDFIJZGVsdGFEYXRhEhAKA21kNRgEIAEoCVIDbWQ1EhUKBmRvY19pZBgFIAEoCVIFZG9jSWQSGAoCdHkYBiABKA4yCC5SZXZUeXBlUgJ0eQ=='); +@$core.Deprecated('Use revIdDescriptor instead') +const RevId$json = const { + '1': 'RevId', + '2': const [ + const {'1': 'value', '3': 1, '4': 1, '5': 3, '10': 'value'}, + ], +}; + +/// Descriptor for `RevId`. Decode as a `google.protobuf.DescriptorProto`. +final $typed_data.Uint8List revIdDescriptor = $convert.base64Decode('CgVSZXZJZBIUCgV2YWx1ZRgBIAEoA1IFdmFsdWU='); @$core.Deprecated('Use revisionRangeDescriptor instead') const RevisionRange$json = const { '1': 'RevisionRange', diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document-infra/revision.pbserver.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/model.pbserver.dart similarity index 85% rename from frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document-infra/revision.pbserver.dart rename to frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/model.pbserver.dart index 4797cc3361..0a058aba64 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document-infra/revision.pbserver.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/model.pbserver.dart @@ -1,9 +1,9 @@ /// // Generated code. Do not modify. -// source: revision.proto +// source: model.proto // // @dart = 2.12 // ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields,deprecated_member_use_from_same_package -export 'revision.pb.dart'; +export 'model.pb.dart'; diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/protobuf.dart b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/protobuf.dart new file mode 100644 index 0000000000..f524faf0c8 --- /dev/null +++ b/frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/protobuf.dart @@ -0,0 +1,2 @@ +// Auto-generated, do not edit +export './model.pb.dart'; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs index 56560b18d8..119a2335cf 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs @@ -11,7 +11,7 @@ use flowy_database::ConnectionPool; use flowy_document_infra::{ core::history::UndoResult, entities::{ - doc::{DocDelta, RevId, RevType, Revision, RevisionRange}, + doc::DocDelta, ws::{WsDataType, WsDocumentData}, }, errors::DocumentResult, @@ -19,6 +19,7 @@ use flowy_document_infra::{ use lib_infra::retry::{ExponentialBackoff, Retry}; use lib_ot::{ core::Interval, + revision::{RevId, RevType, Revision, RevisionRange}, rich_text::{RichTextAttribute, RichTextDelta}, }; use lib_ws::WsConnectState; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/model.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/model.rs index 62bbb76da4..2916dce85a 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/edit/model.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/edit/model.rs @@ -1,7 +1,8 @@ use crate::{errors::DocError, services::ws::DocumentWebSocket}; -use flowy_document_infra::entities::doc::{NewDocUser, RevId}; +use flowy_document_infra::entities::doc::NewDocUser; use futures::future::BoxFuture; use lib_infra::retry::Action; +use lib_ot::revision::RevId; use std::{future, sync::Arc}; pub(crate) struct OpenDocAction { diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/queue.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/queue.rs index d5a2dc10dd..5e9c94dd10 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/edit/queue.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/edit/queue.rs @@ -2,12 +2,12 @@ use async_stream::stream; use bytes::Bytes; use flowy_document_infra::{ core::{history::UndoResult, Document}, - entities::doc::{RevId, Revision}, errors::DocumentError, }; use futures::stream::StreamExt; use lib_ot::{ core::{Interval, OperationTransformable}, + revision::{RevId, Revision}, rich_text::{RichTextAttribute, RichTextDelta}, }; use std::{convert::TryFrom, sync::Arc}; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs index c765b4bbfa..84f9679227 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs @@ -3,12 +3,13 @@ use crate::{ services::doc::revision::RevisionStore, }; use flowy_database::ConnectionPool; -use flowy_document_infra::{ - entities::doc::{Doc, RevId, RevType, Revision, RevisionRange}, - util::RevIdCounter, -}; +use flowy_document_infra::{entities::doc::Doc, util::RevIdCounter}; use lib_infra::future::ResultFuture; -use lib_ot::{core::OperationTransformable, rich_text::RichTextDelta}; +use lib_ot::{ + core::OperationTransformable, + revision::{RevId, RevType, Revision, RevisionRange}, + rich_text::RichTextDelta, +}; use std::sync::Arc; use tokio::sync::mpsc; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/model.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/model.rs index 2922a04383..51e2d68fe3 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/model.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/model.rs @@ -1,48 +1,13 @@ use crate::{ errors::{internal_error, DocError, DocResult}, - sql_tables::{RevState, RevTableSql}, + sql_tables::{RevTableSql, SqlRevState}, }; use flowy_database::ConnectionPool; -use flowy_document_infra::entities::doc::{Revision, RevisionRange}; use lib_infra::future::ResultFuture; +use lib_ot::revision::{Revision, RevisionRange}; use std::sync::Arc; use tokio::sync::broadcast; -pub type RevIdReceiver = broadcast::Receiver; -pub type RevIdSender = broadcast::Sender; - -pub struct RevisionRecord { - pub revision: Revision, - pub state: RevState, -} - -impl RevisionRecord { - pub fn new(revision: Revision) -> Self { - Self { - revision, - state: RevState::Local, - } - } -} - -pub(crate) struct PendingRevId { - pub rev_id: i64, - pub sender: RevIdSender, -} - -impl PendingRevId { - pub(crate) fn new(rev_id: i64, sender: RevIdSender) -> Self { Self { rev_id, sender } } - - pub(crate) fn finish(&self, rev_id: i64) -> bool { - if self.rev_id > rev_id { - false - } else { - let _ = self.sender.send(self.rev_id); - true - } - } -} - pub(crate) struct Persistence { pub(crate) rev_sql: Arc, pub(crate) pool: Arc, @@ -54,7 +19,7 @@ impl Persistence { Self { rev_sql, pool } } - pub(crate) fn create_revs(&self, revisions: Vec<(Revision, RevState)>) -> DocResult<()> { + pub(crate) fn create_revs(&self, revisions: Vec<(Revision, SqlRevState)>) -> DocResult<()> { let conn = &*self.pool.get().map_err(internal_error)?; conn.immediate_transaction::<_, DocError, _>(|| { let _ = self.rev_sql.create_rev_table(revisions, conn)?; diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/persistence.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/persistence.rs index 6c0f808726..5b866181a9 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/persistence.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/persistence.rs @@ -1,16 +1,17 @@ use crate::{ errors::{internal_error, DocError, DocResult}, services::doc::revision::{model::*, RevisionServer}, - sql_tables::RevState, + sql_tables::SqlRevState, }; use async_stream::stream; use dashmap::DashMap; use flowy_database::{ConnectionPool, SqliteConnection}; -use flowy_document_infra::entities::doc::{revision_from_doc, Doc, RevId, RevType, Revision, RevisionRange}; +use flowy_document_infra::entities::doc::Doc; use futures::stream::StreamExt; use lib_infra::future::ResultFuture; use lib_ot::{ core::{Operation, OperationTransformable}, + revision::{PendingRevId, RevId, RevIdReceiver, RevType, Revision, RevisionRange, RevisionRecord}, rich_text::RichTextDelta, }; use std::{collections::VecDeque, sync::Arc, time::Duration}; @@ -52,7 +53,7 @@ impl RevisionStore { server, }); - tokio::spawn(RevisionStream::new(store.clone(), pending_rx, ws_revision_sender).run()); + tokio::spawn(RevisionUploadStream::new(store.clone(), pending_rx, ws_revision_sender).run()); store } @@ -70,7 +71,7 @@ impl RevisionStore { if let Ok(rev_id) = rx.recv().await { match revs_map.get_mut(&rev_id) { None => {}, - Some(mut rev) => rev.value_mut().state = RevState::Acked, + Some(mut rev) => rev.value_mut().state = SqlRevState::Acked.into(), } } }); @@ -113,7 +114,7 @@ impl RevisionStore { let revisions_state = revs_map .iter() .map(|kv| (kv.revision.clone(), kv.state)) - .collect::>(); + .collect::>(); match persistence.create_revs(revisions_state.clone()) { Ok(_) => { @@ -157,11 +158,16 @@ impl RevisionStore { let doc = self.server.fetch_document_from_remote(&self.doc_id).await?; let revision = revision_from_doc(doc.clone(), RevType::Remote); - let _ = self.persistence.create_revs(vec![(revision, RevState::Acked)])?; + let _ = self.persistence.create_revs(vec![(revision, SqlRevState::Acked)])?; Ok(doc) } } +pub fn revision_from_doc(doc: Doc, ty: RevType) -> Revision { + let delta_data = doc.data.as_bytes(); + Revision::new(doc.base_rev_id, doc.rev_id, delta_data.to_owned(), &doc.id, ty) +} + impl RevisionIterator for RevisionStore { fn next(&self) -> ResultFuture, DocError> { let pending_revs = self.pending_revs.clone(); @@ -316,13 +322,13 @@ pub(crate) enum PendingMsg { pub(crate) type PendingSender = mpsc::UnboundedSender; pub(crate) type PendingReceiver = mpsc::UnboundedReceiver; -pub(crate) struct RevisionStream { +pub(crate) struct RevisionUploadStream { revisions: Arc, receiver: Option, ws_revision_sender: mpsc::UnboundedSender, } -impl RevisionStream { +impl RevisionUploadStream { pub(crate) fn new( revisions: Arc, pending_rx: PendingReceiver, diff --git a/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs b/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs index cc5d15a463..2098693535 100644 --- a/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs +++ b/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs @@ -1,17 +1,17 @@ use crate::{ errors::DocError, - sql_tables::{doc::RevTable, RevChangeset, RevState, RevTableType}, + sql_tables::{doc::RevTable, RevChangeset, RevTableType, SqlRevState}, }; use diesel::update; use flowy_database::{insert_or_ignore_into, prelude::*, schema::rev_table::dsl, SqliteConnection}; -use flowy_document_infra::entities::doc::{Revision, RevisionRange}; +use lib_ot::revision::{Revision, RevisionRange}; pub struct RevTableSql {} impl RevTableSql { pub(crate) fn create_rev_table( &self, - revisions: Vec<(Revision, RevState)>, + revisions: Vec<(Revision, SqlRevState)>, conn: &SqliteConnection, ) -> Result<(), DocError> { // Batch insert: https://diesel.rs/guides/all-about-inserts.html diff --git a/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_table.rs b/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_table.rs index 6294168744..6d5ff19052 100644 --- a/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_table.rs +++ b/frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_table.rs @@ -1,9 +1,7 @@ use diesel::sql_types::Integer; use flowy_database::schema::rev_table; -use flowy_document_infra::{ - entities::doc::{RevId, RevType, Revision}, - util::md5, -}; +use flowy_document_infra::util::md5; +use lib_ot::revision::{RevId, RevState, RevType, Revision}; #[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)] #[table_name = "rev_table"] @@ -13,39 +11,49 @@ pub(crate) struct RevTable { pub(crate) base_rev_id: i64, pub(crate) rev_id: i64, pub(crate) data: Vec, - pub(crate) state: RevState, + pub(crate) state: SqlRevState, pub(crate) ty: RevTableType, } #[derive(Clone, Copy, PartialEq, Eq, Debug, Hash, FromSqlRow, AsExpression)] #[repr(i32)] #[sql_type = "Integer"] -pub enum RevState { +pub enum SqlRevState { Local = 0, Acked = 1, } -impl std::default::Default for RevState { - fn default() -> Self { RevState::Local } +impl std::default::Default for SqlRevState { + fn default() -> Self { SqlRevState::Local } } -impl std::convert::From for RevState { +impl std::convert::From for SqlRevState { fn from(value: i32) -> Self { match value { - 0 => RevState::Local, - 1 => RevState::Acked, + 0 => SqlRevState::Local, + 1 => SqlRevState::Acked, o => { log::error!("Unsupported rev state {}, fallback to RevState::Local", o); - RevState::Local + SqlRevState::Local }, } } } -impl RevState { + +impl SqlRevState { pub fn value(&self) -> i32 { *self as i32 } } impl_sql_integer_expression!(RevState); +impl std::convert::From for RevState { + fn from(s: SqlRevState) -> Self { + match s { + SqlRevState::Local => RevState.Local, + SqlRevState::Acked => RevState.Acked, + } + } +} + impl std::convert::From for Revision { fn from(table: RevTable) -> Self { let md5 = md5(&table.data); @@ -111,5 +119,5 @@ impl_sql_integer_expression!(RevTableType); pub(crate) struct RevChangeset { pub(crate) doc_id: String, pub(crate) rev_id: RevId, - pub(crate) state: RevState, + pub(crate) state: SqlRevState, } diff --git a/shared-lib/Cargo.lock b/shared-lib/Cargo.lock index b6c108e131..eafd1ca8d7 100644 --- a/shared-lib/Cargo.lock +++ b/shared-lib/Cargo.lock @@ -1128,13 +1128,18 @@ version = "0.1.0" dependencies = [ "bytecount", "bytes", + "dashmap", "derive_more", + "flowy-derive", "lazy_static", "log", + "md5", + "protobuf", "serde", "serde_json", "strum", "strum_macros", + "tokio", "tracing", ] diff --git a/shared-lib/flowy-derive/src/derive_cache/derive_cache.rs b/shared-lib/flowy-derive/src/derive_cache/derive_cache.rs index bbb6284bf4..f5f90b2b79 100644 --- a/shared-lib/flowy-derive/src/derive_cache/derive_cache.rs +++ b/shared-lib/flowy-derive/src/derive_cache/derive_cache.rs @@ -63,12 +63,12 @@ pub fn category_from_str(type_str: &str) -> TypeCategory { | "DocDelta" | "NewDocUser" | "DocIdentifier" - | "RevId" - | "Revision" - | "RevisionRange" | "WsDocumentData" | "WsError" | "WsMessage" + | "Revision" + | "RevId" + | "RevisionRange" | "SignInRequest" | "SignInParams" | "SignInResponse" @@ -91,9 +91,9 @@ pub fn category_from_str(type_str: &str) -> TypeCategory { | "TrashType" | "ViewType" | "ExportType" - | "RevType" | "WsDataType" | "WsModule" + | "RevType" => TypeCategory::Enum, "Option" => TypeCategory::Opt, diff --git a/shared-lib/flowy-document-infra/src/entities/doc/mod.rs b/shared-lib/flowy-document-infra/src/entities/doc/mod.rs index 3bfe846c63..ef2ed9076d 100644 --- a/shared-lib/flowy-document-infra/src/entities/doc/mod.rs +++ b/shared-lib/flowy-document-infra/src/entities/doc/mod.rs @@ -1,7 +1,5 @@ #![allow(clippy::module_inception)] mod doc; pub mod parser; -mod revision; pub use doc::*; -pub use revision::*; diff --git a/shared-lib/flowy-document-infra/src/entities/ws/ws.rs b/shared-lib/flowy-document-infra/src/entities/ws/ws.rs index b332a0f79c..c7457eaf12 100644 --- a/shared-lib/flowy-document-infra/src/entities/ws/ws.rs +++ b/shared-lib/flowy-document-infra/src/entities/ws/ws.rs @@ -1,9 +1,7 @@ -use crate::{ - entities::doc::{NewDocUser, Revision}, - errors::DocumentError, -}; +use crate::{entities::doc::NewDocUser, errors::DocumentError}; use bytes::Bytes; use flowy_derive::{ProtoBuf, ProtoBuf_Enum}; +use lib_ot::revision::Revision; use std::convert::{TryFrom, TryInto}; #[derive(Debug, Clone, ProtoBuf_Enum, Eq, PartialEq, Hash)] diff --git a/shared-lib/flowy-document-infra/src/protobuf/model/mod.rs b/shared-lib/flowy-document-infra/src/protobuf/model/mod.rs index 28f1c42882..8989e8fbdf 100644 --- a/shared-lib/flowy-document-infra/src/protobuf/model/mod.rs +++ b/shared-lib/flowy-document-infra/src/protobuf/model/mod.rs @@ -4,8 +4,5 @@ mod ws; pub use ws::*; -mod revision; -pub use revision::*; - mod doc; pub use doc::*; diff --git a/shared-lib/lib-ot/Cargo.toml b/shared-lib/lib-ot/Cargo.toml index 2fa2437165..d6b99d19e2 100644 --- a/shared-lib/lib-ot/Cargo.toml +++ b/shared-lib/lib-ot/Cargo.toml @@ -8,6 +8,12 @@ edition = "2018" [dependencies] bytecount = "0.6.0" serde = { version = "1.0", features = ["derive"] } +protobuf = {version = "2.18.0"} +flowy-derive = { path = "../flowy-derive" } +tokio = {version = "1", features = ["sync"]} +dashmap = "4.0" +md5 = "0.7.0" + serde_json = {version = "1.0"} derive_more = {version = "0.99", features = ["display"]} log = "0.4" diff --git a/shared-lib/lib-ot/Flowy.toml b/shared-lib/lib-ot/Flowy.toml new file mode 100644 index 0000000000..cd4d90ffd0 --- /dev/null +++ b/shared-lib/lib-ot/Flowy.toml @@ -0,0 +1,3 @@ + +proto_crates = ["src/revision"] +event_files = [] \ No newline at end of file diff --git a/shared-lib/lib-ot/src/errors.rs b/shared-lib/lib-ot/src/errors.rs index b7ae81150e..5f67d699ac 100644 --- a/shared-lib/lib-ot/src/errors.rs +++ b/shared-lib/lib-ot/src/errors.rs @@ -1,4 +1,4 @@ -use std::{error::Error, fmt, str::Utf8Error}; +use std::{error::Error, fmt, fmt::Debug, str::Utf8Error}; #[derive(Clone, Debug)] pub struct OTError { @@ -6,6 +6,22 @@ pub struct OTError { pub msg: String, } +macro_rules! static_ot_error { + ($name:ident, $code:expr) => { + #[allow(non_snake_case, missing_docs)] + pub fn $name() -> OTError { $code.into() } + }; +} + +impl std::convert::From for OTError { + fn from(code: OTErrorCode) -> Self { + OTError { + code: code.clone(), + msg: format!("{:?}", code), + } + } +} + impl OTError { pub fn new(code: OTErrorCode, msg: &str) -> OTError { Self { @@ -13,6 +29,13 @@ impl OTError { msg: msg.to_owned(), } } + + pub fn context(mut self, error: T) -> Self { + self.msg = format!("{:?}", error); + self + } + + static_ot_error!(duplicate_revision, OTErrorCode::DuplicatedRevision); } impl fmt::Display for OTError { @@ -42,6 +65,7 @@ pub enum OTErrorCode { UndoFail, RedoFail, SerdeError, + DuplicatedRevision, } pub struct ErrorBuilder { diff --git a/shared-lib/lib-ot/src/lib.rs b/shared-lib/lib-ot/src/lib.rs index 5a3be0ede7..f66fdabd84 100644 --- a/shared-lib/lib-ot/src/lib.rs +++ b/shared-lib/lib-ot/src/lib.rs @@ -1,3 +1,5 @@ pub mod core; pub mod errors; +pub mod protobuf; +pub mod revision; pub mod rich_text; diff --git a/shared-lib/lib-ot/src/protobuf/mod.rs b/shared-lib/lib-ot/src/protobuf/mod.rs new file mode 100644 index 0000000000..da97aad28a --- /dev/null +++ b/shared-lib/lib-ot/src/protobuf/mod.rs @@ -0,0 +1,4 @@ +#![cfg_attr(rustfmt, rustfmt::skip)] +// Auto-generated, do not edit +mod model; +pub use model::*; \ No newline at end of file diff --git a/shared-lib/lib-ot/src/protobuf/model/mod.rs b/shared-lib/lib-ot/src/protobuf/model/mod.rs new file mode 100644 index 0000000000..d30d09866a --- /dev/null +++ b/shared-lib/lib-ot/src/protobuf/model/mod.rs @@ -0,0 +1,5 @@ +#![cfg_attr(rustfmt, rustfmt::skip)] +// Auto-generated, do not edit + +mod model; +pub use model::*; diff --git a/shared-lib/flowy-document-infra/src/protobuf/model/revision.rs b/shared-lib/lib-ot/src/protobuf/model/model.rs similarity index 87% rename from shared-lib/flowy-document-infra/src/protobuf/model/revision.rs rename to shared-lib/lib-ot/src/protobuf/model/model.rs index 83bd5e7ad0..125f1cc78c 100644 --- a/shared-lib/flowy-document-infra/src/protobuf/model/revision.rs +++ b/shared-lib/lib-ot/src/protobuf/model/model.rs @@ -17,164 +17,12 @@ #![allow(trivial_casts)] #![allow(unused_imports)] #![allow(unused_results)] -//! Generated file from `revision.proto` +//! Generated file from `model.proto` /// Generated files are compatible only with the same version /// of protobuf runtime. // const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_22_1; -#[derive(PartialEq,Clone,Default)] -pub struct RevId { - // message fields - pub value: i64, - // special fields - pub unknown_fields: ::protobuf::UnknownFields, - pub cached_size: ::protobuf::CachedSize, -} - -impl<'a> ::std::default::Default for &'a RevId { - fn default() -> &'a RevId { - ::default_instance() - } -} - -impl RevId { - pub fn new() -> RevId { - ::std::default::Default::default() - } - - // int64 value = 1; - - - pub fn get_value(&self) -> i64 { - self.value - } - pub fn clear_value(&mut self) { - self.value = 0; - } - - // Param is passed by value, moved - pub fn set_value(&mut self, v: i64) { - self.value = v; - } -} - -impl ::protobuf::Message for RevId { - fn is_initialized(&self) -> bool { - true - } - - fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> { - while !is.eof()? { - let (field_number, wire_type) = is.read_tag_unpack()?; - match field_number { - 1 => { - if wire_type != ::protobuf::wire_format::WireTypeVarint { - return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type)); - } - let tmp = is.read_int64()?; - self.value = tmp; - }, - _ => { - ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; - }, - }; - } - ::std::result::Result::Ok(()) - } - - // Compute sizes of nested messages - #[allow(unused_variables)] - fn compute_size(&self) -> u32 { - let mut my_size = 0; - if self.value != 0 { - my_size += ::protobuf::rt::value_size(1, self.value, ::protobuf::wire_format::WireTypeVarint); - } - my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); - self.cached_size.set(my_size); - my_size - } - - fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> { - if self.value != 0 { - os.write_int64(1, self.value)?; - } - os.write_unknown_fields(self.get_unknown_fields())?; - ::std::result::Result::Ok(()) - } - - fn get_cached_size(&self) -> u32 { - self.cached_size.get() - } - - fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { - &self.unknown_fields - } - - fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { - &mut self.unknown_fields - } - - fn as_any(&self) -> &dyn (::std::any::Any) { - self as &dyn (::std::any::Any) - } - fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) { - self as &mut dyn (::std::any::Any) - } - fn into_any(self: ::std::boxed::Box) -> ::std::boxed::Box { - self - } - - fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { - Self::descriptor_static() - } - - fn new() -> RevId { - RevId::new() - } - - fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor { - static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT; - descriptor.get(|| { - let mut fields = ::std::vec::Vec::new(); - fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeInt64>( - "value", - |m: &RevId| { &m.value }, - |m: &mut RevId| { &mut m.value }, - )); - ::protobuf::reflect::MessageDescriptor::new_pb_name::( - "RevId", - fields, - file_descriptor_proto() - ) - }) - } - - fn default_instance() -> &'static RevId { - static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; - instance.get(RevId::new) - } -} - -impl ::protobuf::Clear for RevId { - fn clear(&mut self) { - self.value = 0; - self.unknown_fields.clear(); - } -} - -impl ::std::fmt::Debug for RevId { - fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { - ::protobuf::text_format::fmt(self, f) - } -} - -impl ::protobuf::reflect::ProtobufValue for RevId { - fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { - ::protobuf::reflect::ReflectValueRef::Message(self) - } -} - #[derive(PartialEq,Clone,Default)] pub struct Revision { // message fields @@ -519,6 +367,158 @@ impl ::protobuf::reflect::ProtobufValue for Revision { } } +#[derive(PartialEq,Clone,Default)] +pub struct RevId { + // message fields + pub value: i64, + // special fields + pub unknown_fields: ::protobuf::UnknownFields, + pub cached_size: ::protobuf::CachedSize, +} + +impl<'a> ::std::default::Default for &'a RevId { + fn default() -> &'a RevId { + ::default_instance() + } +} + +impl RevId { + pub fn new() -> RevId { + ::std::default::Default::default() + } + + // int64 value = 1; + + + pub fn get_value(&self) -> i64 { + self.value + } + pub fn clear_value(&mut self) { + self.value = 0; + } + + // Param is passed by value, moved + pub fn set_value(&mut self, v: i64) { + self.value = v; + } +} + +impl ::protobuf::Message for RevId { + fn is_initialized(&self) -> bool { + true + } + + fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> { + while !is.eof()? { + let (field_number, wire_type) = is.read_tag_unpack()?; + match field_number { + 1 => { + if wire_type != ::protobuf::wire_format::WireTypeVarint { + return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type)); + } + let tmp = is.read_int64()?; + self.value = tmp; + }, + _ => { + ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; + }, + }; + } + ::std::result::Result::Ok(()) + } + + // Compute sizes of nested messages + #[allow(unused_variables)] + fn compute_size(&self) -> u32 { + let mut my_size = 0; + if self.value != 0 { + my_size += ::protobuf::rt::value_size(1, self.value, ::protobuf::wire_format::WireTypeVarint); + } + my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); + self.cached_size.set(my_size); + my_size + } + + fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> { + if self.value != 0 { + os.write_int64(1, self.value)?; + } + os.write_unknown_fields(self.get_unknown_fields())?; + ::std::result::Result::Ok(()) + } + + fn get_cached_size(&self) -> u32 { + self.cached_size.get() + } + + fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { + &self.unknown_fields + } + + fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { + &mut self.unknown_fields + } + + fn as_any(&self) -> &dyn (::std::any::Any) { + self as &dyn (::std::any::Any) + } + fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) { + self as &mut dyn (::std::any::Any) + } + fn into_any(self: ::std::boxed::Box) -> ::std::boxed::Box { + self + } + + fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { + Self::descriptor_static() + } + + fn new() -> RevId { + RevId::new() + } + + fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor { + static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT; + descriptor.get(|| { + let mut fields = ::std::vec::Vec::new(); + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeInt64>( + "value", + |m: &RevId| { &m.value }, + |m: &mut RevId| { &mut m.value }, + )); + ::protobuf::reflect::MessageDescriptor::new_pb_name::( + "RevId", + fields, + file_descriptor_proto() + ) + }) + } + + fn default_instance() -> &'static RevId { + static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; + instance.get(RevId::new) + } +} + +impl ::protobuf::Clear for RevId { + fn clear(&mut self) { + self.value = 0; + self.unknown_fields.clear(); + } +} + +impl ::std::fmt::Debug for RevId { + fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { + ::protobuf::text_format::fmt(self, f) + } +} + +impl ::protobuf::reflect::ProtobufValue for RevId { + fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { + ::protobuf::reflect::ReflectValueRef::Message(self) + } +} + #[derive(PartialEq,Clone,Default)] pub struct RevisionRange { // message fields @@ -799,53 +799,52 @@ impl ::protobuf::reflect::ProtobufValue for RevType { } static file_descriptor_proto_data: &'static [u8] = b"\ - \n\x0erevision.proto\"\x1d\n\x05RevId\x12\x14\n\x05value\x18\x01\x20\x01\ - (\x03R\x05value\"\xa3\x01\n\x08Revision\x12\x1e\n\x0bbase_rev_id\x18\x01\ - \x20\x01(\x03R\tbaseRevId\x12\x15\n\x06rev_id\x18\x02\x20\x01(\x03R\x05r\ - evId\x12\x1d\n\ndelta_data\x18\x03\x20\x01(\x0cR\tdeltaData\x12\x10\n\ - \x03md5\x18\x04\x20\x01(\tR\x03md5\x12\x15\n\x06doc_id\x18\x05\x20\x01(\ - \tR\x05docId\x12\x18\n\x02ty\x18\x06\x20\x01(\x0e2\x08.RevTypeR\x02ty\"N\ - \n\rRevisionRange\x12\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\x12\ - \x14\n\x05start\x18\x02\x20\x01(\x03R\x05start\x12\x10\n\x03end\x18\x03\ - \x20\x01(\x03R\x03end*\x20\n\x07RevType\x12\t\n\x05Local\x10\0\x12\n\n\ - \x06Remote\x10\x01J\xea\x05\n\x06\x12\x04\0\0\x15\x01\n\x08\n\x01\x0c\ - \x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x04\x01\n\n\n\x03\x04\0\ - \x01\x12\x03\x02\x08\r\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x14\n\x0c\ - \n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\t\n\x0c\n\x05\x04\0\x02\0\x01\x12\ - \x03\x03\n\x0f\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x12\x13\n\n\n\x02\ - \x04\x01\x12\x04\x05\0\x0c\x01\n\n\n\x03\x04\x01\x01\x12\x03\x05\x08\x10\ - \n\x0b\n\x04\x04\x01\x02\0\x12\x03\x06\x04\x1a\n\x0c\n\x05\x04\x01\x02\0\ - \x05\x12\x03\x06\x04\t\n\x0c\n\x05\x04\x01\x02\0\x01\x12\x03\x06\n\x15\n\ - \x0c\n\x05\x04\x01\x02\0\x03\x12\x03\x06\x18\x19\n\x0b\n\x04\x04\x01\x02\ - \x01\x12\x03\x07\x04\x15\n\x0c\n\x05\x04\x01\x02\x01\x05\x12\x03\x07\x04\ - \t\n\x0c\n\x05\x04\x01\x02\x01\x01\x12\x03\x07\n\x10\n\x0c\n\x05\x04\x01\ - \x02\x01\x03\x12\x03\x07\x13\x14\n\x0b\n\x04\x04\x01\x02\x02\x12\x03\x08\ - \x04\x19\n\x0c\n\x05\x04\x01\x02\x02\x05\x12\x03\x08\x04\t\n\x0c\n\x05\ - \x04\x01\x02\x02\x01\x12\x03\x08\n\x14\n\x0c\n\x05\x04\x01\x02\x02\x03\ - \x12\x03\x08\x17\x18\n\x0b\n\x04\x04\x01\x02\x03\x12\x03\t\x04\x13\n\x0c\ - \n\x05\x04\x01\x02\x03\x05\x12\x03\t\x04\n\n\x0c\n\x05\x04\x01\x02\x03\ - \x01\x12\x03\t\x0b\x0e\n\x0c\n\x05\x04\x01\x02\x03\x03\x12\x03\t\x11\x12\ - \n\x0b\n\x04\x04\x01\x02\x04\x12\x03\n\x04\x16\n\x0c\n\x05\x04\x01\x02\ - \x04\x05\x12\x03\n\x04\n\n\x0c\n\x05\x04\x01\x02\x04\x01\x12\x03\n\x0b\ - \x11\n\x0c\n\x05\x04\x01\x02\x04\x03\x12\x03\n\x14\x15\n\x0b\n\x04\x04\ - \x01\x02\x05\x12\x03\x0b\x04\x13\n\x0c\n\x05\x04\x01\x02\x05\x06\x12\x03\ - \x0b\x04\x0b\n\x0c\n\x05\x04\x01\x02\x05\x01\x12\x03\x0b\x0c\x0e\n\x0c\n\ - \x05\x04\x01\x02\x05\x03\x12\x03\x0b\x11\x12\n\n\n\x02\x04\x02\x12\x04\r\ - \0\x11\x01\n\n\n\x03\x04\x02\x01\x12\x03\r\x08\x15\n\x0b\n\x04\x04\x02\ - \x02\0\x12\x03\x0e\x04\x16\n\x0c\n\x05\x04\x02\x02\0\x05\x12\x03\x0e\x04\ - \n\n\x0c\n\x05\x04\x02\x02\0\x01\x12\x03\x0e\x0b\x11\n\x0c\n\x05\x04\x02\ - \x02\0\x03\x12\x03\x0e\x14\x15\n\x0b\n\x04\x04\x02\x02\x01\x12\x03\x0f\ - \x04\x14\n\x0c\n\x05\x04\x02\x02\x01\x05\x12\x03\x0f\x04\t\n\x0c\n\x05\ - \x04\x02\x02\x01\x01\x12\x03\x0f\n\x0f\n\x0c\n\x05\x04\x02\x02\x01\x03\ - \x12\x03\x0f\x12\x13\n\x0b\n\x04\x04\x02\x02\x02\x12\x03\x10\x04\x12\n\ - \x0c\n\x05\x04\x02\x02\x02\x05\x12\x03\x10\x04\t\n\x0c\n\x05\x04\x02\x02\ - \x02\x01\x12\x03\x10\n\r\n\x0c\n\x05\x04\x02\x02\x02\x03\x12\x03\x10\x10\ - \x11\n\n\n\x02\x05\0\x12\x04\x12\0\x15\x01\n\n\n\x03\x05\0\x01\x12\x03\ - \x12\x05\x0c\n\x0b\n\x04\x05\0\x02\0\x12\x03\x13\x04\x0e\n\x0c\n\x05\x05\ - \0\x02\0\x01\x12\x03\x13\x04\t\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x13\ - \x0c\r\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x14\x04\x0f\n\x0c\n\x05\x05\0\ - \x02\x01\x01\x12\x03\x14\x04\n\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x14\ - \r\x0eb\x06proto3\ + \n\x0bmodel.proto\"\xa3\x01\n\x08Revision\x12\x1e\n\x0bbase_rev_id\x18\ + \x01\x20\x01(\x03R\tbaseRevId\x12\x15\n\x06rev_id\x18\x02\x20\x01(\x03R\ + \x05revId\x12\x1d\n\ndelta_data\x18\x03\x20\x01(\x0cR\tdeltaData\x12\x10\ + \n\x03md5\x18\x04\x20\x01(\tR\x03md5\x12\x15\n\x06doc_id\x18\x05\x20\x01\ + (\tR\x05docId\x12\x18\n\x02ty\x18\x06\x20\x01(\x0e2\x08.RevTypeR\x02ty\"\ + \x1d\n\x05RevId\x12\x14\n\x05value\x18\x01\x20\x01(\x03R\x05value\"N\n\r\ + RevisionRange\x12\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\x12\x14\n\ + \x05start\x18\x02\x20\x01(\x03R\x05start\x12\x10\n\x03end\x18\x03\x20\ + \x01(\x03R\x03end*\x20\n\x07RevType\x12\t\n\x05Local\x10\0\x12\n\n\x06Re\ + mote\x10\x01J\xea\x05\n\x06\x12\x04\0\0\x15\x01\n\x08\n\x01\x0c\x12\x03\ + \0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\t\x01\n\n\n\x03\x04\0\x01\x12\x03\ + \x02\x08\x10\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x1a\n\x0c\n\x05\x04\ + \0\x02\0\x05\x12\x03\x03\x04\t\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\n\ + \x15\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x18\x19\n\x0b\n\x04\x04\0\ + \x02\x01\x12\x03\x04\x04\x15\n\x0c\n\x05\x04\0\x02\x01\x05\x12\x03\x04\ + \x04\t\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\n\x10\n\x0c\n\x05\x04\0\ + \x02\x01\x03\x12\x03\x04\x13\x14\n\x0b\n\x04\x04\0\x02\x02\x12\x03\x05\ + \x04\x19\n\x0c\n\x05\x04\0\x02\x02\x05\x12\x03\x05\x04\t\n\x0c\n\x05\x04\ + \0\x02\x02\x01\x12\x03\x05\n\x14\n\x0c\n\x05\x04\0\x02\x02\x03\x12\x03\ + \x05\x17\x18\n\x0b\n\x04\x04\0\x02\x03\x12\x03\x06\x04\x13\n\x0c\n\x05\ + \x04\0\x02\x03\x05\x12\x03\x06\x04\n\n\x0c\n\x05\x04\0\x02\x03\x01\x12\ + \x03\x06\x0b\x0e\n\x0c\n\x05\x04\0\x02\x03\x03\x12\x03\x06\x11\x12\n\x0b\ + \n\x04\x04\0\x02\x04\x12\x03\x07\x04\x16\n\x0c\n\x05\x04\0\x02\x04\x05\ + \x12\x03\x07\x04\n\n\x0c\n\x05\x04\0\x02\x04\x01\x12\x03\x07\x0b\x11\n\ + \x0c\n\x05\x04\0\x02\x04\x03\x12\x03\x07\x14\x15\n\x0b\n\x04\x04\0\x02\ + \x05\x12\x03\x08\x04\x13\n\x0c\n\x05\x04\0\x02\x05\x06\x12\x03\x08\x04\ + \x0b\n\x0c\n\x05\x04\0\x02\x05\x01\x12\x03\x08\x0c\x0e\n\x0c\n\x05\x04\0\ + \x02\x05\x03\x12\x03\x08\x11\x12\n\n\n\x02\x04\x01\x12\x04\n\0\x0c\x01\n\ + \n\n\x03\x04\x01\x01\x12\x03\n\x08\r\n\x0b\n\x04\x04\x01\x02\0\x12\x03\ + \x0b\x04\x14\n\x0c\n\x05\x04\x01\x02\0\x05\x12\x03\x0b\x04\t\n\x0c\n\x05\ + \x04\x01\x02\0\x01\x12\x03\x0b\n\x0f\n\x0c\n\x05\x04\x01\x02\0\x03\x12\ + \x03\x0b\x12\x13\n\n\n\x02\x04\x02\x12\x04\r\0\x11\x01\n\n\n\x03\x04\x02\ + \x01\x12\x03\r\x08\x15\n\x0b\n\x04\x04\x02\x02\0\x12\x03\x0e\x04\x16\n\ + \x0c\n\x05\x04\x02\x02\0\x05\x12\x03\x0e\x04\n\n\x0c\n\x05\x04\x02\x02\0\ + \x01\x12\x03\x0e\x0b\x11\n\x0c\n\x05\x04\x02\x02\0\x03\x12\x03\x0e\x14\ + \x15\n\x0b\n\x04\x04\x02\x02\x01\x12\x03\x0f\x04\x14\n\x0c\n\x05\x04\x02\ + \x02\x01\x05\x12\x03\x0f\x04\t\n\x0c\n\x05\x04\x02\x02\x01\x01\x12\x03\ + \x0f\n\x0f\n\x0c\n\x05\x04\x02\x02\x01\x03\x12\x03\x0f\x12\x13\n\x0b\n\ + \x04\x04\x02\x02\x02\x12\x03\x10\x04\x12\n\x0c\n\x05\x04\x02\x02\x02\x05\ + \x12\x03\x10\x04\t\n\x0c\n\x05\x04\x02\x02\x02\x01\x12\x03\x10\n\r\n\x0c\ + \n\x05\x04\x02\x02\x02\x03\x12\x03\x10\x10\x11\n\n\n\x02\x05\0\x12\x04\ + \x12\0\x15\x01\n\n\n\x03\x05\0\x01\x12\x03\x12\x05\x0c\n\x0b\n\x04\x05\0\ + \x02\0\x12\x03\x13\x04\x0e\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x13\x04\t\ + \n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x13\x0c\r\n\x0b\n\x04\x05\0\x02\x01\ + \x12\x03\x14\x04\x0f\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\x14\x04\n\n\ + \x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x14\r\x0eb\x06proto3\ "; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; diff --git a/shared-lib/flowy-document-infra/src/protobuf/proto/revision.proto b/shared-lib/lib-ot/src/protobuf/proto/model.proto similarity index 100% rename from shared-lib/flowy-document-infra/src/protobuf/proto/revision.proto rename to shared-lib/lib-ot/src/protobuf/proto/model.proto index 44a3137bc1..9ed4f2ea15 100644 --- a/shared-lib/flowy-document-infra/src/protobuf/proto/revision.proto +++ b/shared-lib/lib-ot/src/protobuf/proto/model.proto @@ -1,8 +1,5 @@ syntax = "proto3"; -message RevId { - int64 value = 1; -} message Revision { int64 base_rev_id = 1; int64 rev_id = 2; @@ -11,6 +8,9 @@ message Revision { string doc_id = 5; RevType ty = 6; } +message RevId { + int64 value = 1; +} message RevisionRange { string doc_id = 1; int64 start = 2; diff --git a/shared-lib/lib-ot/src/revision/cache.rs b/shared-lib/lib-ot/src/revision/cache.rs new file mode 100644 index 0000000000..0a4023b4d5 --- /dev/null +++ b/shared-lib/lib-ot/src/revision/cache.rs @@ -0,0 +1,109 @@ +use crate::{ + errors::OTError, + revision::{RevId, Revision, RevisionRange}, +}; +use dashmap::{mapref::one::RefMut, DashMap}; +use std::{collections::VecDeque, sync::Arc}; +use tokio::sync::{broadcast, RwLock}; + +pub trait RevisionDiskCache { + fn create_revision(&self, revision: &Revision) -> Result<(), OTError>; + fn revisions_in_range(&self, range: RevisionRange) -> Result>, OTError>; + fn read_revision(&self, rev_id: i64) -> Result, OTError>; +} + +pub struct RevisionMemoryCache { + revs_map: Arc>, + pending_revs: Arc>>, +} + +impl std::default::Default for RevisionMemoryCache { + fn default() -> Self { + let pending_revs = Arc::new(RwLock::new(VecDeque::new())); + RevisionMemoryCache { + revs_map: Arc::new(DashMap::new()), + pending_revs, + } + } +} + +impl RevisionMemoryCache { + pub fn new() -> Self { RevisionMemoryCache::default() } + + pub async fn add_revision(&self, revision: Revision) -> Result<(), OTError> { + if self.revs_map.contains_key(&revision.rev_id) { + return Err(OTError::duplicate_revision().context(format!("Duplicate revision id: {}", revision.rev_id))); + } + + self.pending_revs.write().await.push_back(revision.rev_id); + self.revs_map.insert(revision.rev_id, RevisionRecord::new(revision)); + Ok(()) + } + + pub async fn mut_revision(&self, rev_id: i64, f: F) + where + F: Fn(RefMut), + { + if let Some(m_revision) = self.revs_map.get_mut(&rev_id) { + f(m_revision) + } else { + log::error!("Can't find revision with id {}", rev_id); + } + } + + pub async fn revisions_in_range(&self, range: RevisionRange) -> Result>, OTError> { + let revs = range + .iter() + .flat_map(|rev_id| match self.revs_map.get(&rev_id) { + None => None, + Some(rev) => Some(rev.revision.clone()), + }) + .collect::>(); + + if revs.len() == range.len() as usize { + Ok(Some(revs)) + } else { + Ok(None) + } + } +} + +pub type RevIdReceiver = broadcast::Receiver; +pub type RevIdSender = broadcast::Sender; + +pub enum RevState { + Local = 0, + Acked = 1, +} + +pub struct RevisionRecord { + pub revision: Revision, + pub state: RevState, +} + +impl RevisionRecord { + pub fn new(revision: Revision) -> Self { + Self { + revision, + state: RevState::Local, + } + } +} + +pub struct PendingRevId { + pub rev_id: i64, + pub sender: RevIdSender, +} + +impl PendingRevId { + pub fn new(rev_id: i64, sender: RevIdSender) -> Self { Self { rev_id, sender } } + + pub fn finish(&self, rev_id: i64) -> bool { + if self.rev_id > rev_id { + false + } else { + let _ = self.sender.send(self.rev_id); + true + } + } +} diff --git a/shared-lib/lib-ot/src/revision/mod.rs b/shared-lib/lib-ot/src/revision/mod.rs new file mode 100644 index 0000000000..9848b59af1 --- /dev/null +++ b/shared-lib/lib-ot/src/revision/mod.rs @@ -0,0 +1,5 @@ +mod cache; +mod model; + +pub use cache::*; +pub use model::*; diff --git a/shared-lib/flowy-document-infra/src/entities/doc/revision.rs b/shared-lib/lib-ot/src/revision/model.rs similarity index 79% rename from shared-lib/flowy-document-infra/src/entities/doc/revision.rs rename to shared-lib/lib-ot/src/revision/model.rs index cb0ea3ce25..7b17e85699 100644 --- a/shared-lib/flowy-document-infra/src/entities/doc/revision.rs +++ b/shared-lib/lib-ot/src/revision/model.rs @@ -1,62 +1,7 @@ -use crate::{entities::doc::Doc, util::md5}; +use crate::rich_text::RichTextDelta; use flowy_derive::{ProtoBuf, ProtoBuf_Enum}; -use lib_ot::rich_text::RichTextDelta; use std::{fmt::Formatter, ops::RangeInclusive}; -#[derive(Debug, ProtoBuf_Enum, Clone, Eq, PartialEq)] -pub enum RevType { - Local = 0, - Remote = 1, -} - -impl RevType { - pub fn is_local(&self) -> bool { self == &RevType::Local } -} - -impl std::default::Default for RevType { - fn default() -> Self { RevType::Local } -} - -// [[i64 to bytes]] -// use byteorder::{BigEndian, ReadBytesExt}; -// use std::{io::Cursor}; -// impl std::convert::TryFrom for RevId { -// type Error = DocError; -// -// fn try_from(bytes: Bytes) -> Result { -// // let mut wtr = vec![]; -// // let _ = wtr.write_i64::(revision.rev_id); -// -// let mut rdr = Cursor::new(bytes); -// match rdr.read_i64::() { -// Ok(rev_id) => Ok(RevId(rev_id)), -// Err(e) => Err(DocError::internal().context(e)), -// } -// } -// } - -#[derive(Clone, Debug, ProtoBuf, Default)] -pub struct RevId { - #[pb(index = 1)] - pub value: i64, -} - -impl AsRef for RevId { - fn as_ref(&self) -> &i64 { &self.value } -} - -impl std::convert::From for i64 { - fn from(rev_id: RevId) -> Self { rev_id.value } -} - -impl std::convert::From for RevId { - fn from(value: i64) -> Self { RevId { value } } -} - -impl std::fmt::Display for RevId { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_fmt(format_args!("{}", self.value)) } -} - #[derive(PartialEq, Eq, Clone, Default, ProtoBuf)] pub struct Revision { #[pb(index = 1)] @@ -127,9 +72,40 @@ impl Revision { } } -pub fn revision_from_doc(doc: Doc, ty: RevType) -> Revision { - let delta_data = doc.data.as_bytes(); - Revision::new(doc.base_rev_id, doc.rev_id, delta_data.to_owned(), &doc.id, ty) +#[derive(Clone, Debug, ProtoBuf, Default)] +pub struct RevId { + #[pb(index = 1)] + pub value: i64, +} + +impl AsRef for RevId { + fn as_ref(&self) -> &i64 { &self.value } +} + +impl std::convert::From for i64 { + fn from(rev_id: RevId) -> Self { rev_id.value } +} + +impl std::convert::From for RevId { + fn from(value: i64) -> Self { RevId { value } } +} + +impl std::fmt::Display for RevId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_fmt(format_args!("{}", self.value)) } +} + +#[derive(Debug, ProtoBuf_Enum, Clone, Eq, PartialEq)] +pub enum RevType { + Local = 0, + Remote = 1, +} + +impl RevType { + pub fn is_local(&self) -> bool { self == &RevType::Local } +} + +impl std::default::Default for RevType { + fn default() -> Self { RevType::Local } } #[derive(Debug, Clone, Default, ProtoBuf)] @@ -161,3 +137,9 @@ impl RevisionRange { RangeInclusive::new(self.start, self.end) } } + +#[inline] +pub fn md5>(data: T) -> String { + let md5 = format!("{:x}", md5::compute(data)); + md5 +}