save revision to disk if not exisgt

This commit is contained in:
appflowy 2021-10-06 15:23:38 +08:00
parent 1c8d2c5ac0
commit 04f8fc38a8
25 changed files with 237 additions and 162 deletions

View File

@ -2,6 +2,7 @@ extend = [
{ path = "scripts/makefile/desktop.toml" },
{ path = "scripts/makefile/protobuf.toml" },
{ path = "scripts/makefile/tests.toml" },
{ path = "scripts/makefile/docker.toml" },
]
[env]

View File

@ -9,14 +9,14 @@ import 'package:flowy_sdk/protobuf/flowy-document/doc.pb.dart';
import 'package:flowy_sdk/protobuf/flowy-workspace/errors.pb.dart';
class FlowyDoc implements EditorDeltaSender {
final Doc doc;
final DocDelta doc;
final IDoc iDocImpl;
Document data;
FlowyDoc({required this.doc, required this.data, required this.iDocImpl}) {
data.sender = this;
}
String get id => doc.id;
String get id => doc.docId;
@override
void sendNewDelta(Delta changeset, Delta delta) async {
@ -40,7 +40,8 @@ class FlowyDoc implements EditorDeltaSender {
}
abstract class IDoc {
Future<Either<Doc, WorkspaceError>> readDoc();
Future<Either<Doc, WorkspaceError>> applyChangeset({required String json});
Future<Either<DocDelta, WorkspaceError>> readDoc();
Future<Either<DocDelta, WorkspaceError>> applyChangeset(
{required String json});
Future<Either<Unit, WorkspaceError>> closeDoc();
}

View File

@ -18,13 +18,14 @@ class IDocImpl extends IDoc {
}
@override
Future<Either<Doc, WorkspaceError>> readDoc() async {
Future<Either<DocDelta, WorkspaceError>> readDoc() async {
final docOrFail = await repo.readDoc();
return docOrFail;
}
@override
Future<Either<Doc, WorkspaceError>> applyChangeset({required String json}) {
Future<Either<DocDelta, WorkspaceError>> applyChangeset(
{required String json}) {
return repo.applyDelta(data: json);
}
}

View File

@ -10,12 +10,12 @@ class DocRepository {
required this.docId,
});
Future<Either<Doc, WorkspaceError>> readDoc() {
Future<Either<DocDelta, WorkspaceError>> readDoc() {
final request = OpenViewRequest.create()..viewId = docId;
return WorkspaceEventOpenView(request).send();
}
Future<Either<Doc, WorkspaceError>> applyDelta({required String data}) {
Future<Either<DocDelta, WorkspaceError>> applyDelta({required String data}) {
final request = DocDelta.create()
..docId = docId
..data = data;

View File

@ -241,14 +241,14 @@ class WorkspaceEventOpenView {
OpenViewRequest request;
WorkspaceEventOpenView(this.request);
Future<Either<Doc, WorkspaceError>> send() {
Future<Either<DocDelta, WorkspaceError>> send() {
final request = FFIRequest.create()
..event = WorkspaceEvent.OpenView.toString()
..payload = requestToBytes(this.request);
return Dispatch.asyncRequest(request)
.then((bytesResult) => bytesResult.fold(
(okBytes) => left(Doc.fromBuffer(okBytes)),
(okBytes) => left(DocDelta.fromBuffer(okBytes)),
(errBytes) => right(WorkspaceError.fromBuffer(errBytes)),
));
}
@ -258,14 +258,14 @@ class WorkspaceEventApplyDocDelta {
DocDelta request;
WorkspaceEventApplyDocDelta(this.request);
Future<Either<Doc, WorkspaceError>> send() {
Future<Either<DocDelta, WorkspaceError>> send() {
final request = FFIRequest.create()
..event = WorkspaceEvent.ApplyDocDelta.toString()
..payload = requestToBytes(this.request);
return Dispatch.asyncRequest(request)
.then((bytesResult) => bytesResult.fold(
(okBytes) => left(Doc.fromBuffer(okBytes)),
(okBytes) => left(DocDelta.fromBuffer(okBytes)),
(errBytes) => right(WorkspaceError.fromBuffer(errBytes)),
));
}

View File

@ -76,6 +76,7 @@ class Doc extends $pb.GeneratedMessage {
..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'id')
..aOS(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'data')
..aInt64(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'revId')
..aInt64(4, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'baseRevId')
..hasRequiredFields = false
;
@ -84,6 +85,7 @@ class Doc extends $pb.GeneratedMessage {
$core.String? id,
$core.String? data,
$fixnum.Int64? revId,
$fixnum.Int64? baseRevId,
}) {
final _result = create();
if (id != null) {
@ -95,6 +97,9 @@ class Doc extends $pb.GeneratedMessage {
if (revId != null) {
_result.revId = revId;
}
if (baseRevId != null) {
_result.baseRevId = baseRevId;
}
return _result;
}
factory Doc.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r);
@ -144,6 +149,15 @@ class Doc extends $pb.GeneratedMessage {
$core.bool hasRevId() => $_has(2);
@$pb.TagNumber(3)
void clearRevId() => clearField(3);
@$pb.TagNumber(4)
$fixnum.Int64 get baseRevId => $_getI64(3);
@$pb.TagNumber(4)
set baseRevId($fixnum.Int64 v) { $_setInt64(3, v); }
@$pb.TagNumber(4)
$core.bool hasBaseRevId() => $_has(3);
@$pb.TagNumber(4)
void clearBaseRevId() => clearField(4);
}
class UpdateDocParams extends $pb.GeneratedMessage {

View File

@ -26,11 +26,12 @@ const Doc$json = const {
const {'1': 'id', '3': 1, '4': 1, '5': 9, '10': 'id'},
const {'1': 'data', '3': 2, '4': 1, '5': 9, '10': 'data'},
const {'1': 'rev_id', '3': 3, '4': 1, '5': 3, '10': 'revId'},
const {'1': 'base_rev_id', '3': 4, '4': 1, '5': 3, '10': 'baseRevId'},
],
};
/// Descriptor for `Doc`. Decode as a `google.protobuf.DescriptorProto`.
final $typed_data.Uint8List docDescriptor = $convert.base64Decode('CgNEb2MSDgoCaWQYASABKAlSAmlkEhIKBGRhdGEYAiABKAlSBGRhdGESFQoGcmV2X2lkGAMgASgDUgVyZXZJZA==');
final $typed_data.Uint8List docDescriptor = $convert.base64Decode('CgNEb2MSDgoCaWQYASABKAlSAmlkEhIKBGRhdGEYAiABKAlSBGRhdGESFQoGcmV2X2lkGAMgASgDUgVyZXZJZBIeCgtiYXNlX3Jldl9pZBgEIAEoA1IJYmFzZVJldklk');
@$core.Deprecated('Use updateDocParamsDescriptor instead')
const UpdateDocParams$json = const {
'1': 'UpdateDocParams',

View File

@ -1,3 +1,3 @@
include scripts/database/database.mk
.PHONY: init_database add_migrations run_migrations reset_db echo_db_url
.PHONY: init_postgres init_database add_migrations run_migrations reset_db echo_db_url

View File

@ -2,9 +2,16 @@
set -x
set -eo pipefail
#if [[ -z "${RESET}" ]]
#then
# docker stop flowy
# docker rm flowy
#fi
if [[ -z "${SKIP_DOCKER}" ]]
then
docker run \
--name="flowy" \
-e POSTGRES_USER=${DB_USER} \
-e POSTGRES_PASSWORD=${DB_PASSWORD} \
-e POSTGRES_DB=${DB_NAME} \

View File

@ -15,7 +15,7 @@ use flowy_user::services::user::UserSession;
use crate::helper::{spawn_server, TestServer};
use flowy_document::protobuf::UpdateDocParams;
use flowy_ot::core::{Attribute, Interval};
use flowy_ot::core::{Attribute, Delta, Interval};
use parking_lot::RwLock;
pub struct DocumentTest {
@ -152,11 +152,14 @@ async fn run_scripts(context: Arc<RwLock<ScriptContext>>, scripts: Vec<DocScript
}
fn assert_eq(expect: &str, receive: &str) {
if expect != receive {
log::error!("expect: {}", expect);
log::error!("but receive: {}", receive);
let expected_delta: Delta = serde_json::from_str(expect).unwrap();
let target_delta: Delta = serde_json::from_str(receive).unwrap();
if expected_delta != target_delta {
log::error!("✅ expect: {}", expect,);
log::error!("❌ receive: {}", receive);
}
assert_eq!(expect, receive);
assert_eq!(target_delta, expected_delta);
}
async fn create_doc(flowy_test: &FlowyTest) -> String {

View File

@ -1,4 +1,6 @@
use crate::errors::DocResult;
use flowy_derive::ProtoBuf;
use flowy_ot::core::Delta;
#[derive(ProtoBuf, Default, Debug, Clone)]
pub struct CreateDocParams {
@ -28,6 +30,16 @@ pub struct Doc {
#[pb(index = 3)]
pub rev_id: i64,
#[pb(index = 4)]
pub base_rev_id: i64,
}
impl Doc {
pub fn delta(&self) -> DocResult<Delta> {
let delta = Delta::from_bytes(&self.data)?;
Ok(delta)
}
}
#[derive(ProtoBuf, Default, Debug, Clone)]

View File

@ -1,5 +1,6 @@
use crate::services::util::md5;
use crate::entities::doc::Doc;
use flowy_derive::{ProtoBuf, ProtoBuf_Enum};
use flowy_ot::core::Delta;
use std::fmt::Formatter;
@ -116,6 +117,18 @@ impl Revision {
}
}
pub fn revision_from_doc(doc: Doc, ty: RevType) -> Revision {
let delta_data = doc.data.as_bytes();
let revision = Revision::new(
doc.base_rev_id.clone(),
doc.rev_id.clone(),
delta_data.to_owned(),
&doc.id,
ty,
);
revision
}
#[derive(Debug, Clone, Default, ProtoBuf)]
pub struct RevisionRange {
#[pb(index = 1)]

View File

@ -1,12 +1,5 @@
use std::sync::Arc;
use diesel::SqliteConnection;
use flowy_database::ConnectionPool;
use flowy_net::config::ServerConfig;
use crate::{
entities::doc::{CreateDocParams, Doc, DocDelta, QueryDocParams},
entities::doc::{CreateDocParams, DocDelta, QueryDocParams},
errors::DocError,
services::{
doc::{doc_controller::DocController, edit::ClientEditDoc},
@ -14,6 +7,9 @@ use crate::{
ws::WsDocumentManager,
},
};
use flowy_database::ConnectionPool;
use flowy_net::config::ServerConfig;
use std::sync::Arc;
pub trait DocumentUser: Send + Sync {
fn user_dir(&self) -> Result<String, DocError>;
@ -41,13 +37,13 @@ impl FlowyDocument {
Ok(())
}
pub fn create(&self, params: CreateDocParams, conn: &SqliteConnection) -> Result<(), DocError> {
let _ = self.doc_ctrl.create(params, conn)?;
pub fn create(&self, params: CreateDocParams) -> Result<(), DocError> {
let _ = self.doc_ctrl.create(params)?;
Ok(())
}
pub fn delete(&self, params: QueryDocParams, conn: &SqliteConnection) -> Result<(), DocError> {
let _ = self.doc_ctrl.delete(params, conn)?;
pub fn delete(&self, params: QueryDocParams) -> Result<(), DocError> {
let _ = self.doc_ctrl.delete(params)?;
Ok(())
}
@ -60,7 +56,7 @@ impl FlowyDocument {
Ok(edit_context)
}
pub async fn apply_doc_delta(&self, params: DocDelta) -> Result<Doc, DocError> {
pub async fn apply_doc_delta(&self, params: DocDelta) -> Result<DocDelta, DocError> {
// workaround: compare the rust's delta with flutter's delta. Will be removed
// very soon
let doc = self.doc_ctrl.edit_doc(params.clone()).await?;

View File

@ -230,6 +230,7 @@ pub struct Doc {
pub id: ::std::string::String,
pub data: ::std::string::String,
pub rev_id: i64,
pub base_rev_id: i64,
// special fields
pub unknown_fields: ::protobuf::UnknownFields,
pub cached_size: ::protobuf::CachedSize,
@ -312,6 +313,21 @@ impl Doc {
pub fn set_rev_id(&mut self, v: i64) {
self.rev_id = v;
}
// int64 base_rev_id = 4;
pub fn get_base_rev_id(&self) -> i64 {
self.base_rev_id
}
pub fn clear_base_rev_id(&mut self) {
self.base_rev_id = 0;
}
// Param is passed by value, moved
pub fn set_base_rev_id(&mut self, v: i64) {
self.base_rev_id = v;
}
}
impl ::protobuf::Message for Doc {
@ -336,6 +352,13 @@ impl ::protobuf::Message for Doc {
let tmp = is.read_int64()?;
self.rev_id = tmp;
},
4 => {
if wire_type != ::protobuf::wire_format::WireTypeVarint {
return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type));
}
let tmp = is.read_int64()?;
self.base_rev_id = tmp;
},
_ => {
::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
},
@ -357,6 +380,9 @@ impl ::protobuf::Message for Doc {
if self.rev_id != 0 {
my_size += ::protobuf::rt::value_size(3, self.rev_id, ::protobuf::wire_format::WireTypeVarint);
}
if self.base_rev_id != 0 {
my_size += ::protobuf::rt::value_size(4, self.base_rev_id, ::protobuf::wire_format::WireTypeVarint);
}
my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
self.cached_size.set(my_size);
my_size
@ -372,6 +398,9 @@ impl ::protobuf::Message for Doc {
if self.rev_id != 0 {
os.write_int64(3, self.rev_id)?;
}
if self.base_rev_id != 0 {
os.write_int64(4, self.base_rev_id)?;
}
os.write_unknown_fields(self.get_unknown_fields())?;
::std::result::Result::Ok(())
}
@ -425,6 +454,11 @@ impl ::protobuf::Message for Doc {
|m: &Doc| { &m.rev_id },
|m: &mut Doc| { &mut m.rev_id },
));
fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeInt64>(
"base_rev_id",
|m: &Doc| { &m.base_rev_id },
|m: &mut Doc| { &mut m.base_rev_id },
));
::protobuf::reflect::MessageDescriptor::new_pb_name::<Doc>(
"Doc",
fields,
@ -444,6 +478,7 @@ impl ::protobuf::Clear for Doc {
self.id.clear();
self.data.clear();
self.rev_id = 0;
self.base_rev_id = 0;
self.unknown_fields.clear();
}
}
@ -1294,63 +1329,67 @@ impl ::protobuf::reflect::ProtobufValue for QueryDocParams {
static file_descriptor_proto_data: &'static [u8] = b"\
\n\tdoc.proto\"5\n\x0fCreateDocParams\x12\x0e\n\x02id\x18\x01\x20\x01(\t\
R\x02id\x12\x12\n\x04data\x18\x02\x20\x01(\tR\x04data\"@\n\x03Doc\x12\
R\x02id\x12\x12\n\x04data\x18\x02\x20\x01(\tR\x04data\"`\n\x03Doc\x12\
\x0e\n\x02id\x18\x01\x20\x01(\tR\x02id\x12\x12\n\x04data\x18\x02\x20\x01\
(\tR\x04data\x12\x15\n\x06rev_id\x18\x03\x20\x01(\x03R\x05revId\"S\n\x0f\
UpdateDocParams\x12\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\x12\x12\
\n\x04data\x18\x02\x20\x01(\tR\x04data\x12\x15\n\x06rev_id\x18\x03\x20\
\x01(\x03R\x05revId\"5\n\x08DocDelta\x12\x15\n\x06doc_id\x18\x01\x20\x01\
(\tR\x05docId\x12\x12\n\x04data\x18\x02\x20\x01(\tR\x04data\"S\n\nNewDoc\
User\x12\x17\n\x07user_id\x18\x01\x20\x01(\tR\x06userId\x12\x15\n\x06rev\
_id\x18\x02\x20\x01(\x03R\x05revId\x12\x15\n\x06doc_id\x18\x03\x20\x01(\
\tR\x05docId\"'\n\x0eQueryDocParams\x12\x15\n\x06doc_id\x18\x01\x20\x01(\
\tR\x05docIdJ\xa4\x07\n\x06\x12\x04\0\0\x1b\x01\n\x08\n\x01\x0c\x12\x03\
\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x05\x01\n\n\n\x03\x04\0\x01\x12\
\x03\x02\x08\x17\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x12\n\x0c\n\x05\
\x04\0\x02\0\x05\x12\x03\x03\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\
\x03\x0b\r\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x10\x11\n\x0b\n\x04\
\x04\0\x02\x01\x12\x03\x04\x04\x14\n\x0c\n\x05\x04\0\x02\x01\x05\x12\x03\
\x04\x04\n\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\x0b\x0f\n\x0c\n\x05\
\x04\0\x02\x01\x03\x12\x03\x04\x12\x13\n\n\n\x02\x04\x01\x12\x04\x06\0\n\
\x01\n\n\n\x03\x04\x01\x01\x12\x03\x06\x08\x0b\n\x0b\n\x04\x04\x01\x02\0\
\x12\x03\x07\x04\x12\n\x0c\n\x05\x04\x01\x02\0\x05\x12\x03\x07\x04\n\n\
\x0c\n\x05\x04\x01\x02\0\x01\x12\x03\x07\x0b\r\n\x0c\n\x05\x04\x01\x02\0\
\x03\x12\x03\x07\x10\x11\n\x0b\n\x04\x04\x01\x02\x01\x12\x03\x08\x04\x14\
\n\x0c\n\x05\x04\x01\x02\x01\x05\x12\x03\x08\x04\n\n\x0c\n\x05\x04\x01\
\x02\x01\x01\x12\x03\x08\x0b\x0f\n\x0c\n\x05\x04\x01\x02\x01\x03\x12\x03\
\x08\x12\x13\n\x0b\n\x04\x04\x01\x02\x02\x12\x03\t\x04\x15\n\x0c\n\x05\
\x04\x01\x02\x02\x05\x12\x03\t\x04\t\n\x0c\n\x05\x04\x01\x02\x02\x01\x12\
\x03\t\n\x10\n\x0c\n\x05\x04\x01\x02\x02\x03\x12\x03\t\x13\x14\n\n\n\x02\
\x04\x02\x12\x04\x0b\0\x0f\x01\n\n\n\x03\x04\x02\x01\x12\x03\x0b\x08\x17\
\n\x0b\n\x04\x04\x02\x02\0\x12\x03\x0c\x04\x16\n\x0c\n\x05\x04\x02\x02\0\
\x05\x12\x03\x0c\x04\n\n\x0c\n\x05\x04\x02\x02\0\x01\x12\x03\x0c\x0b\x11\
\n\x0c\n\x05\x04\x02\x02\0\x03\x12\x03\x0c\x14\x15\n\x0b\n\x04\x04\x02\
\x02\x01\x12\x03\r\x04\x14\n\x0c\n\x05\x04\x02\x02\x01\x05\x12\x03\r\x04\
\n\n\x0c\n\x05\x04\x02\x02\x01\x01\x12\x03\r\x0b\x0f\n\x0c\n\x05\x04\x02\
\x02\x01\x03\x12\x03\r\x12\x13\n\x0b\n\x04\x04\x02\x02\x02\x12\x03\x0e\
\x04\x15\n\x0c\n\x05\x04\x02\x02\x02\x05\x12\x03\x0e\x04\t\n\x0c\n\x05\
\x04\x02\x02\x02\x01\x12\x03\x0e\n\x10\n\x0c\n\x05\x04\x02\x02\x02\x03\
\x12\x03\x0e\x13\x14\n\n\n\x02\x04\x03\x12\x04\x10\0\x13\x01\n\n\n\x03\
\x04\x03\x01\x12\x03\x10\x08\x10\n\x0b\n\x04\x04\x03\x02\0\x12\x03\x11\
\x04\x16\n\x0c\n\x05\x04\x03\x02\0\x05\x12\x03\x11\x04\n\n\x0c\n\x05\x04\
\x03\x02\0\x01\x12\x03\x11\x0b\x11\n\x0c\n\x05\x04\x03\x02\0\x03\x12\x03\
\x11\x14\x15\n\x0b\n\x04\x04\x03\x02\x01\x12\x03\x12\x04\x14\n\x0c\n\x05\
\x04\x03\x02\x01\x05\x12\x03\x12\x04\n\n\x0c\n\x05\x04\x03\x02\x01\x01\
\x12\x03\x12\x0b\x0f\n\x0c\n\x05\x04\x03\x02\x01\x03\x12\x03\x12\x12\x13\
\n\n\n\x02\x04\x04\x12\x04\x14\0\x18\x01\n\n\n\x03\x04\x04\x01\x12\x03\
\x14\x08\x12\n\x0b\n\x04\x04\x04\x02\0\x12\x03\x15\x04\x17\n\x0c\n\x05\
\x04\x04\x02\0\x05\x12\x03\x15\x04\n\n\x0c\n\x05\x04\x04\x02\0\x01\x12\
\x03\x15\x0b\x12\n\x0c\n\x05\x04\x04\x02\0\x03\x12\x03\x15\x15\x16\n\x0b\
\n\x04\x04\x04\x02\x01\x12\x03\x16\x04\x15\n\x0c\n\x05\x04\x04\x02\x01\
\x05\x12\x03\x16\x04\t\n\x0c\n\x05\x04\x04\x02\x01\x01\x12\x03\x16\n\x10\
\n\x0c\n\x05\x04\x04\x02\x01\x03\x12\x03\x16\x13\x14\n\x0b\n\x04\x04\x04\
\x02\x02\x12\x03\x17\x04\x16\n\x0c\n\x05\x04\x04\x02\x02\x05\x12\x03\x17\
\x04\n\n\x0c\n\x05\x04\x04\x02\x02\x01\x12\x03\x17\x0b\x11\n\x0c\n\x05\
\x04\x04\x02\x02\x03\x12\x03\x17\x14\x15\n\n\n\x02\x04\x05\x12\x04\x19\0\
\x1b\x01\n\n\n\x03\x04\x05\x01\x12\x03\x19\x08\x16\n\x0b\n\x04\x04\x05\
\x02\0\x12\x03\x1a\x04\x16\n\x0c\n\x05\x04\x05\x02\0\x05\x12\x03\x1a\x04\
\n\n\x0c\n\x05\x04\x05\x02\0\x01\x12\x03\x1a\x0b\x11\n\x0c\n\x05\x04\x05\
\x02\0\x03\x12\x03\x1a\x14\x15b\x06proto3\
(\tR\x04data\x12\x15\n\x06rev_id\x18\x03\x20\x01(\x03R\x05revId\x12\x1e\
\n\x0bbase_rev_id\x18\x04\x20\x01(\x03R\tbaseRevId\"S\n\x0fUpdateDocPara\
ms\x12\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\x12\x12\n\x04data\
\x18\x02\x20\x01(\tR\x04data\x12\x15\n\x06rev_id\x18\x03\x20\x01(\x03R\
\x05revId\"5\n\x08DocDelta\x12\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05do\
cId\x12\x12\n\x04data\x18\x02\x20\x01(\tR\x04data\"S\n\nNewDocUser\x12\
\x17\n\x07user_id\x18\x01\x20\x01(\tR\x06userId\x12\x15\n\x06rev_id\x18\
\x02\x20\x01(\x03R\x05revId\x12\x15\n\x06doc_id\x18\x03\x20\x01(\tR\x05d\
ocId\"'\n\x0eQueryDocParams\x12\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05d\
ocIdJ\xdb\x07\n\x06\x12\x04\0\0\x1c\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\
\n\n\n\x02\x04\0\x12\x04\x02\0\x05\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\
\x08\x17\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x12\n\x0c\n\x05\x04\0\
\x02\0\x05\x12\x03\x03\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\x0b\
\r\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x10\x11\n\x0b\n\x04\x04\0\x02\
\x01\x12\x03\x04\x04\x14\n\x0c\n\x05\x04\0\x02\x01\x05\x12\x03\x04\x04\n\
\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\x0b\x0f\n\x0c\n\x05\x04\0\x02\
\x01\x03\x12\x03\x04\x12\x13\n\n\n\x02\x04\x01\x12\x04\x06\0\x0b\x01\n\n\
\n\x03\x04\x01\x01\x12\x03\x06\x08\x0b\n\x0b\n\x04\x04\x01\x02\0\x12\x03\
\x07\x04\x12\n\x0c\n\x05\x04\x01\x02\0\x05\x12\x03\x07\x04\n\n\x0c\n\x05\
\x04\x01\x02\0\x01\x12\x03\x07\x0b\r\n\x0c\n\x05\x04\x01\x02\0\x03\x12\
\x03\x07\x10\x11\n\x0b\n\x04\x04\x01\x02\x01\x12\x03\x08\x04\x14\n\x0c\n\
\x05\x04\x01\x02\x01\x05\x12\x03\x08\x04\n\n\x0c\n\x05\x04\x01\x02\x01\
\x01\x12\x03\x08\x0b\x0f\n\x0c\n\x05\x04\x01\x02\x01\x03\x12\x03\x08\x12\
\x13\n\x0b\n\x04\x04\x01\x02\x02\x12\x03\t\x04\x15\n\x0c\n\x05\x04\x01\
\x02\x02\x05\x12\x03\t\x04\t\n\x0c\n\x05\x04\x01\x02\x02\x01\x12\x03\t\n\
\x10\n\x0c\n\x05\x04\x01\x02\x02\x03\x12\x03\t\x13\x14\n\x0b\n\x04\x04\
\x01\x02\x03\x12\x03\n\x04\x1a\n\x0c\n\x05\x04\x01\x02\x03\x05\x12\x03\n\
\x04\t\n\x0c\n\x05\x04\x01\x02\x03\x01\x12\x03\n\n\x15\n\x0c\n\x05\x04\
\x01\x02\x03\x03\x12\x03\n\x18\x19\n\n\n\x02\x04\x02\x12\x04\x0c\0\x10\
\x01\n\n\n\x03\x04\x02\x01\x12\x03\x0c\x08\x17\n\x0b\n\x04\x04\x02\x02\0\
\x12\x03\r\x04\x16\n\x0c\n\x05\x04\x02\x02\0\x05\x12\x03\r\x04\n\n\x0c\n\
\x05\x04\x02\x02\0\x01\x12\x03\r\x0b\x11\n\x0c\n\x05\x04\x02\x02\0\x03\
\x12\x03\r\x14\x15\n\x0b\n\x04\x04\x02\x02\x01\x12\x03\x0e\x04\x14\n\x0c\
\n\x05\x04\x02\x02\x01\x05\x12\x03\x0e\x04\n\n\x0c\n\x05\x04\x02\x02\x01\
\x01\x12\x03\x0e\x0b\x0f\n\x0c\n\x05\x04\x02\x02\x01\x03\x12\x03\x0e\x12\
\x13\n\x0b\n\x04\x04\x02\x02\x02\x12\x03\x0f\x04\x15\n\x0c\n\x05\x04\x02\
\x02\x02\x05\x12\x03\x0f\x04\t\n\x0c\n\x05\x04\x02\x02\x02\x01\x12\x03\
\x0f\n\x10\n\x0c\n\x05\x04\x02\x02\x02\x03\x12\x03\x0f\x13\x14\n\n\n\x02\
\x04\x03\x12\x04\x11\0\x14\x01\n\n\n\x03\x04\x03\x01\x12\x03\x11\x08\x10\
\n\x0b\n\x04\x04\x03\x02\0\x12\x03\x12\x04\x16\n\x0c\n\x05\x04\x03\x02\0\
\x05\x12\x03\x12\x04\n\n\x0c\n\x05\x04\x03\x02\0\x01\x12\x03\x12\x0b\x11\
\n\x0c\n\x05\x04\x03\x02\0\x03\x12\x03\x12\x14\x15\n\x0b\n\x04\x04\x03\
\x02\x01\x12\x03\x13\x04\x14\n\x0c\n\x05\x04\x03\x02\x01\x05\x12\x03\x13\
\x04\n\n\x0c\n\x05\x04\x03\x02\x01\x01\x12\x03\x13\x0b\x0f\n\x0c\n\x05\
\x04\x03\x02\x01\x03\x12\x03\x13\x12\x13\n\n\n\x02\x04\x04\x12\x04\x15\0\
\x19\x01\n\n\n\x03\x04\x04\x01\x12\x03\x15\x08\x12\n\x0b\n\x04\x04\x04\
\x02\0\x12\x03\x16\x04\x17\n\x0c\n\x05\x04\x04\x02\0\x05\x12\x03\x16\x04\
\n\n\x0c\n\x05\x04\x04\x02\0\x01\x12\x03\x16\x0b\x12\n\x0c\n\x05\x04\x04\
\x02\0\x03\x12\x03\x16\x15\x16\n\x0b\n\x04\x04\x04\x02\x01\x12\x03\x17\
\x04\x15\n\x0c\n\x05\x04\x04\x02\x01\x05\x12\x03\x17\x04\t\n\x0c\n\x05\
\x04\x04\x02\x01\x01\x12\x03\x17\n\x10\n\x0c\n\x05\x04\x04\x02\x01\x03\
\x12\x03\x17\x13\x14\n\x0b\n\x04\x04\x04\x02\x02\x12\x03\x18\x04\x16\n\
\x0c\n\x05\x04\x04\x02\x02\x05\x12\x03\x18\x04\n\n\x0c\n\x05\x04\x04\x02\
\x02\x01\x12\x03\x18\x0b\x11\n\x0c\n\x05\x04\x04\x02\x02\x03\x12\x03\x18\
\x14\x15\n\n\n\x02\x04\x05\x12\x04\x1a\0\x1c\x01\n\n\n\x03\x04\x05\x01\
\x12\x03\x1a\x08\x16\n\x0b\n\x04\x04\x05\x02\0\x12\x03\x1b\x04\x16\n\x0c\
\n\x05\x04\x05\x02\0\x05\x12\x03\x1b\x04\n\n\x0c\n\x05\x04\x05\x02\0\x01\
\x12\x03\x1b\x0b\x11\n\x0c\n\x05\x04\x05\x02\0\x03\x12\x03\x1b\x14\x15b\
\x06proto3\
";
static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;

View File

@ -8,6 +8,7 @@ message Doc {
string id = 1;
string data = 2;
int64 rev_id = 3;
int64 base_rev_id = 4;
}
message UpdateDocParams {
string doc_id = 1;

View File

@ -4,7 +4,7 @@ use bytes::Bytes;
use crate::{
entities::doc::{CreateDocParams, Doc, DocDelta, QueryDocParams},
errors::{internal_error, DocError, DocResult},
errors::{DocError, DocResult},
module::DocumentUser,
services::{
cache::DocCache,
@ -16,7 +16,7 @@ use crate::{
ws::WsDocumentManager,
},
};
use flowy_database::{ConnectionPool, SqliteConnection};
use flowy_database::ConnectionPool;
use flowy_infra::future::{wrap_future, FnFuture, ResultFuture};
use flowy_ot::core::Delta;
use tokio::time::{interval, Duration};
@ -45,8 +45,8 @@ impl DocController {
Ok(())
}
#[tracing::instrument(skip(self, conn), err)]
pub(crate) fn create(&self, params: CreateDocParams, conn: &SqliteConnection) -> Result<(), DocError> {
#[tracing::instrument(skip(self), err)]
pub(crate) fn create(&self, params: CreateDocParams) -> Result<(), DocError> {
// let _doc = Doc {
// id: params.id,
// data: params.data,
@ -77,8 +77,8 @@ impl DocController {
Ok(())
}
#[tracing::instrument(level = "debug", skip(self, conn), err)]
pub(crate) fn delete(&self, params: QueryDocParams, conn: &SqliteConnection) -> Result<(), DocError> {
#[tracing::instrument(level = "debug", skip(self), err)]
pub(crate) fn delete(&self, params: QueryDocParams) -> Result<(), DocError> {
let doc_id = &params.doc_id;
self.cache.remove(doc_id);
self.ws_manager.remove_handler(doc_id);
@ -87,10 +87,10 @@ impl DocController {
}
#[tracing::instrument(level = "debug", skip(self, delta), err)]
pub(crate) async fn edit_doc(&self, delta: DocDelta) -> Result<Doc, DocError> {
pub(crate) async fn edit_doc(&self, delta: DocDelta) -> Result<DocDelta, DocError> {
let edit_doc_ctx = self.cache.get(&delta.doc_id)?;
let _ = edit_doc_ctx.compose_local_delta(Bytes::from(delta.data)).await?;
Ok(edit_doc_ctx.doc().await?)
Ok(edit_doc_ctx.delta().await?)
}
}
@ -137,7 +137,7 @@ struct RevisionServerImpl {
}
impl RevisionServer for RevisionServerImpl {
fn fetch_document_from_remote(&self, doc_id: &str) -> ResultFuture<DocRevision, DocError> {
fn fetch_document_from_remote(&self, doc_id: &str) -> ResultFuture<Doc, DocError> {
let params = QueryDocParams {
doc_id: doc_id.to_string(),
};
@ -147,14 +147,7 @@ impl RevisionServer for RevisionServerImpl {
ResultFuture::new(async move {
match server.read_doc(&token, params).await? {
None => Err(DocError::not_found()),
Some(doc) => {
let delta = Delta::from_bytes(doc.data)?;
Ok(DocRevision {
base_rev_id: 0.into(),
rev_id: doc.rev_id.into(),
delta,
})
},
Some(doc) => Ok(doc),
}
})
}

View File

@ -1,6 +1,6 @@
use crate::{
entities::doc::{RevId, Revision},
errors::{internal_error, DocResult},
errors::DocResult,
services::doc::{
edit::{
message::{DocumentMsg, TransformDeltas},

View File

@ -1,6 +1,6 @@
use crate::{
entities::{
doc::{Doc, RevId, RevType, Revision, RevisionRange},
doc::{Doc, DocDelta, RevId, RevType, Revision, RevisionRange},
ws::{WsDataType, WsDocumentData},
},
errors::{internal_error, DocError, DocResult},
@ -46,12 +46,9 @@ impl ClientEditDoc {
user: Arc<dyn DocumentUser>,
) -> DocResult<Self> {
let rev_store = spawn_rev_store_actor(doc_id, pool.clone(), server.clone());
let DocRevision {
base_rev_id: _,
rev_id,
delta,
} = load_document(rev_store.clone()).await?;
let rev_manager = Arc::new(RevisionManager::new(doc_id, rev_id, rev_store));
let doc = load_document(rev_store.clone()).await?;
let delta = doc.delta()?;
let rev_manager = Arc::new(RevisionManager::new(doc_id, doc.rev_id.into(), rev_store));
let document = spawn_doc_edit_actor(doc_id, delta, pool.clone());
let doc_id = doc_id.to_string();
let edit_doc = Self {
@ -142,15 +139,16 @@ impl ClientEditDoc {
rx.await.map_err(internal_error)?
}
pub async fn doc(&self) -> DocResult<Doc> {
pub async fn delta(&self) -> DocResult<DocDelta> {
let (ret, rx) = oneshot::channel::<DocResult<String>>();
let msg = DocumentMsg::Doc { ret };
let _ = self.document.send(msg);
let data = rx.await.map_err(internal_error)??;
let rev_id = self.rev_manager.rev_id();
let id = self.doc_id.clone();
Ok(Doc { id, data, rev_id })
Ok(DocDelta {
doc_id: self.doc_id.clone(),
data,
})
}
async fn mk_revision(&self, delta_data: &Bytes) -> Result<RevId, DocError> {
@ -229,7 +227,7 @@ impl ClientEditDoc {
let _ = rx.await.map_err(internal_error)??;
// update rev id
self.rev_manager.update_rev_id(server_rev_id.clone().into());
self.rev_manager.set_rev_id(server_rev_id.clone().into());
let (_, local_rev_id) = self.rev_manager.next_rev_id();
// save the revision
@ -327,7 +325,7 @@ fn spawn_doc_edit_actor(doc_id: &str, delta: Delta, pool: Arc<ConnectionPool>) -
sender
}
async fn load_document(sender: mpsc::Sender<RevisionCmd>) -> DocResult<DocRevision> {
async fn load_document(sender: mpsc::Sender<RevisionCmd>) -> DocResult<Doc> {
let (ret, rx) = oneshot::channel();
let _ = sender.send(RevisionCmd::DocumentDelta { ret }).await;
let result = rx.await.map_err(internal_error)?;

View File

@ -1,5 +1,5 @@
use crate::{
entities::doc::{RevId, RevType, Revision, RevisionRange},
entities::doc::{Doc, RevId, RevType, Revision, RevisionRange},
errors::{internal_error, DocError},
services::{doc::revision::store_actor::RevisionCmd, util::RevIdCounter, ws::DocumentWebSocket},
};
@ -14,7 +14,7 @@ pub struct DocRevision {
}
pub trait RevisionServer: Send + Sync {
fn fetch_document_from_remote(&self, doc_id: &str) -> ResultFuture<DocRevision, DocError>;
fn fetch_document_from_remote(&self, doc_id: &str) -> ResultFuture<Doc, DocError>;
}
pub struct RevisionManager {
@ -60,7 +60,7 @@ impl RevisionManager {
(cur, next)
}
pub fn update_rev_id(&self, rev_id: i64) { self.rev_id_counter.set(rev_id); }
pub fn set_rev_id(&self, rev_id: i64) { self.rev_id_counter.set(rev_id); }
pub async fn construct_revisions(&self, range: RevisionRange) -> Result<Revision, DocError> {
debug_assert!(&range.doc_id == &self.doc_id);

View File

@ -1,5 +1,5 @@
use crate::{
entities::doc::{RevId, RevType, Revision, RevisionRange},
entities::doc::{revision_from_doc, Doc, RevId, RevType, Revision, RevisionRange},
errors::{internal_error, DocError, DocResult},
services::doc::revision::{model::RevisionOperation, DocRevision, RevisionServer},
sql_tables::{RevState, RevTableSql},
@ -28,7 +28,7 @@ pub enum RevisionCmd {
ret: oneshot::Sender<DocResult<Vec<Revision>>>,
},
DocumentDelta {
ret: oneshot::Sender<DocResult<DocRevision>>,
ret: oneshot::Sender<DocResult<Doc>>,
},
}
@ -181,35 +181,22 @@ impl RevisionStoreActor {
}
}
async fn fetch_document(&self) -> DocResult<DocRevision> {
async fn fetch_document(&self) -> DocResult<Doc> {
let result = fetch_from_local(&self.doc_id, self.persistence.clone()).await;
if result.is_ok() {
return result;
}
match self.server.fetch_document_from_remote(&self.doc_id).await {
Ok(doc_revision) => {
let delta_data = doc_revision.delta.to_bytes();
let revision = Revision::new(
doc_revision.base_rev_id.clone(),
doc_revision.rev_id.clone(),
delta_data.to_vec(),
&self.doc_id,
RevType::Remote,
);
self.handle_new_revision(revision);
Ok(doc_revision)
},
Err(e) => Err(e),
}
let doc = self.server.fetch_document_from_remote(&self.doc_id).await?;
let revision = revision_from_doc(doc.clone(), RevType::Remote);
self.handle_new_revision(revision);
Ok(doc)
}
}
async fn fetch_from_local(doc_id: &str, persistence: Arc<Persistence>) -> DocResult<DocRevision> {
async fn fetch_from_local(doc_id: &str, persistence: Arc<Persistence>) -> DocResult<Doc> {
let doc_id = doc_id.to_owned();
spawn_blocking(move || {
// tokio::time::timeout
let conn = &*persistence.pool.get().map_err(internal_error)?;
let revisions = persistence.rev_sql.read_rev_tables(&doc_id, None, conn)?;
if revisions.is_empty() {
@ -230,12 +217,11 @@ async fn fetch_from_local(doc_id: &str, persistence: Arc<Persistence>) -> DocRes
}
}
delta.insert("\n", Attributes::default());
Result::<DocRevision, DocError>::Ok(DocRevision {
base_rev_id,
rev_id,
delta,
Result::<Doc, DocError>::Ok(Doc {
id: doc_id,
data: delta.to_json(),
rev_id: rev_id.into(),
base_rev_id: base_rev_id.into(),
})
})
.await

View File

@ -46,10 +46,10 @@ pub enum WorkspaceEvent {
#[event(input = "DeleteViewRequest")]
DeleteView = 204,
#[event(input = "OpenViewRequest", output = "Doc")]
#[event(input = "OpenViewRequest", output = "DocDelta")]
OpenView = 205,
#[event(input = "DocDelta", output = "Doc")]
#[event(input = "DocDelta", output = "DocDelta")]
ApplyDocDelta = 206,
#[event()]

View File

@ -15,7 +15,7 @@ use crate::{
services::ViewController,
};
use flowy_dispatch::prelude::{data_result, Data, DataResult, Unit};
use flowy_document::entities::doc::{Doc, DocDelta, QueryDocParams};
use flowy_document::entities::doc::{DocDelta, QueryDocParams};
use std::{convert::TryInto, sync::Arc};
#[tracing::instrument(skip(data, controller), err)]
@ -58,7 +58,7 @@ pub(crate) async fn update_view_handler(
pub(crate) async fn apply_doc_delta_handler(
data: Data<DocDelta>,
controller: Unit<Arc<ViewController>>,
) -> DataResult<Doc, WorkspaceError> {
) -> DataResult<DocDelta, WorkspaceError> {
// let params: DocDelta = data.into_inner().try_into()?;
let doc = controller.apply_doc_delta(data.into_inner()).await?;
data_result(doc)
@ -78,7 +78,7 @@ pub(crate) async fn delete_view_handler(
pub(crate) async fn open_view_handler(
data: Data<OpenViewRequest>,
controller: Unit<Arc<ViewController>>,
) -> DataResult<Doc, WorkspaceError> {
) -> DataResult<DocDelta, WorkspaceError> {
let params: QueryDocParams = data.into_inner().try_into()?;
let doc = controller.open_view(params).await?;
data_result(doc)

View File

@ -15,7 +15,7 @@ use crate::{
};
use flowy_database::SqliteConnection;
use flowy_document::{
entities::doc::{CreateDocParams, Doc, DocDelta, QueryDocParams},
entities::doc::{CreateDocParams, DocDelta, QueryDocParams},
module::FlowyDocument,
};
use std::sync::Arc;
@ -56,8 +56,7 @@ impl ViewController {
// TODO: rollback anything created before if failed?
conn.immediate_transaction::<_, WorkspaceError, _>(|| {
let _ = self.save_view(view.clone(), conn)?;
self.document
.create(CreateDocParams::new(&view.id, params.data), conn)?;
self.document.create(CreateDocParams::new(&view.id, params.data))?;
let repeated_view = self.read_local_views_belong_to(&view.belong_to_id, conn)?;
dart_notify(&view.belong_to_id, WorkspaceObservable::AppCreateView)
@ -84,9 +83,9 @@ impl ViewController {
}
#[tracing::instrument(level = "debug", skip(self), err)]
pub(crate) async fn open_view(&self, params: QueryDocParams) -> Result<Doc, WorkspaceError> {
pub(crate) async fn open_view(&self, params: QueryDocParams) -> Result<DocDelta, WorkspaceError> {
let edit_context = self.document.open(params, self.database.db_pool()?).await?;
Ok(edit_context.doc().await.map_err(internal_error)?)
Ok(edit_context.delta().await.map_err(internal_error)?)
}
pub(crate) async fn delete_view(&self, params: DeleteViewParams) -> Result<(), WorkspaceError> {
@ -95,7 +94,7 @@ impl ViewController {
conn.immediate_transaction::<_, WorkspaceError, _>(|| {
let view_table = self.sql.delete_view(&params.view_id, conn)?;
let _ = self.document.delete(params.into(), conn)?;
let _ = self.document.delete(params.into())?;
let repeated_view = self.read_local_views_belong_to(&view_table.belong_to_id, conn)?;
dart_notify(&view_table.belong_to_id, WorkspaceObservable::AppDeleteView)
@ -134,7 +133,7 @@ impl ViewController {
Ok(())
}
pub(crate) async fn apply_doc_delta(&self, params: DocDelta) -> Result<Doc, WorkspaceError> {
pub(crate) async fn apply_doc_delta(&self, params: DocDelta) -> Result<DocDelta, WorkspaceError> {
let doc = self.document.apply_doc_delta(params).await?;
Ok(doc)
}

View File

@ -6,7 +6,7 @@ use crate::{
};
use bytes::Bytes;
use dashmap::DashMap;
use flowy_infra::retry::{Action, ExponentialBackoff, FixedInterval, Retry};
use flowy_infra::retry::{Action, FixedInterval, Retry};
use flowy_net::errors::ServerError;
use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
use futures_core::{ready, Stream};

View File

@ -0,0 +1,10 @@
[tasks.show_usage]
script = """
docker system df -v
# docker volume rm xxxxx
docker volume prune
"""
#docker container ls -al
#docker stop clever_mayer
#docker rm clever_mayer