From 45e74e48a2c3de0fa6ad2b7332859f1bfadfb4fe Mon Sep 17 00:00:00 2001 From: appflowy Date: Tue, 21 Sep 2021 16:21:35 +0800 Subject: [PATCH] send delta to server --- backend/src/service/user_service/logged_user.rs | 2 +- rust-lib/flowy-document/src/module.rs | 3 ++- .../src/services/doc_controller.rs | 2 -- .../src/services/open_doc/manager.rs | 10 ++++++++-- .../src/services/open_doc/open_doc.rs | 16 +++++++++++----- .../flowy-document/src/services/ws/ws_manager.rs | 6 +++--- rust-lib/flowy-ot/Cargo.toml | 1 + rust-lib/flowy-ot/src/core/delta/delta.rs | 11 +++++++++++ .../flowy-sdk/src/deps_resolve/document_deps.rs | 2 +- 9 files changed, 38 insertions(+), 15 deletions(-) diff --git a/backend/src/service/user_service/logged_user.rs b/backend/src/service/user_service/logged_user.rs index 40c06f3141..443574ad70 100644 --- a/backend/src/service/user_service/logged_user.rs +++ b/backend/src/service/user_service/logged_user.rs @@ -77,7 +77,7 @@ enum AuthStatus { NotAuthorized, } -pub const EXPIRED_DURATION_DAYS: i64 = 5; +pub const EXPIRED_DURATION_DAYS: i64 = 30; pub struct AuthorizedUsers(DashMap); impl AuthorizedUsers { diff --git a/rust-lib/flowy-document/src/module.rs b/rust-lib/flowy-document/src/module.rs index e9b4f52702..70fa5e6888 100644 --- a/rust-lib/flowy-document/src/module.rs +++ b/rust-lib/flowy-document/src/module.rs @@ -3,6 +3,7 @@ use crate::{ errors::{internal_error, DocError}, services::{doc_controller::DocController, open_doc::OpenedDocManager, server::construct_doc_server, ws::WsManager}, }; +use bytes::Bytes; use diesel::SqliteConnection; use flowy_database::ConnectionPool; use parking_lot::RwLock; @@ -56,7 +57,7 @@ impl FlowyDocument { } pub async fn apply_changeset(&self, params: ApplyChangesetParams, pool: Arc) -> Result { - let _ = self.doc_manager.apply_changeset(¶ms.id, params.data, pool).await?; + let _ = self.doc_manager.apply_changeset(¶ms.id, Bytes::from(params.data), pool).await?; let data = self.doc_manager.read_doc(¶ms.id).await?; let doc = Doc { id: params.id, data }; Ok(doc) diff --git a/rust-lib/flowy-document/src/services/doc_controller.rs b/rust-lib/flowy-document/src/services/doc_controller.rs index 5af4e44229..42a6f186b7 100644 --- a/rust-lib/flowy-document/src/services/doc_controller.rs +++ b/rust-lib/flowy-document/src/services/doc_controller.rs @@ -135,8 +135,6 @@ impl OpenedDocPersistence for DocController { fn save(&self, params: SaveDocParams, pool: Arc) -> Result<(), DocError> { let changeset = DocTableChangeset::new(params.clone()); let _ = self.sql.update_doc_table(changeset, &*(pool.get().map_err(internal_error)?))?; - let _ = self.update_doc_on_server(params)?; - Ok(()) } } diff --git a/rust-lib/flowy-document/src/services/open_doc/manager.rs b/rust-lib/flowy-document/src/services/open_doc/manager.rs index 73e501d87b..43e0b4ae4f 100644 --- a/rust-lib/flowy-document/src/services/open_doc/manager.rs +++ b/rust-lib/flowy-document/src/services/open_doc/manager.rs @@ -5,6 +5,7 @@ use crate::{ ws::WsManager, }, }; +use bytes::Bytes; use dashmap::DashMap; use flowy_database::ConnectionPool; use flowy_ot::{core::Delta, errors::OTError}; @@ -32,7 +33,12 @@ impl OpenedDocManager { T: Into + Debug, D: TryInto, { - let doc = Arc::new(OpenedDoc::new(id.into(), data.try_into()?, self.persistence.clone())); + let doc = Arc::new(OpenedDoc::new( + id.into(), + data.try_into()?, + self.persistence.clone(), + self.ws_manager.read().sender.clone(), + )); self.ws_manager.write().register_handler(doc.id.as_ref(), doc.clone()); self.doc_map.insert(doc.id.clone(), doc.clone()); Ok(()) @@ -47,7 +53,7 @@ impl OpenedDocManager { } #[tracing::instrument(level = "debug", skip(self, changeset, pool), err)] - pub(crate) async fn apply_changeset(&self, id: T, changeset: Vec, pool: Arc) -> Result<(), DocError> + pub(crate) async fn apply_changeset(&self, id: T, changeset: Bytes, pool: Arc) -> Result<(), DocError> where T: Into + Debug, { diff --git a/rust-lib/flowy-document/src/services/open_doc/open_doc.rs b/rust-lib/flowy-document/src/services/open_doc/open_doc.rs index 07d0e5a270..7545fac355 100644 --- a/rust-lib/flowy-document/src/services/open_doc/open_doc.rs +++ b/rust-lib/flowy-document/src/services/open_doc/open_doc.rs @@ -4,8 +4,9 @@ use crate::{ ws::{WsDocumentData, WsSource}, }, errors::DocError, - services::ws::WsHandler, + services::ws::{WsHandler, WsSender}, }; +use bytes::Bytes; use flowy_database::ConnectionPool; use flowy_ot::{client::Document, core::Delta}; use parking_lot::RwLock; @@ -30,23 +31,28 @@ pub(crate) trait OpenedDocPersistence: Send + Sync { pub(crate) struct OpenedDoc { pub(crate) id: DocId, document: RwLock, + ws_sender: Arc, persistence: Arc, } impl OpenedDoc { - pub(crate) fn new(id: DocId, delta: Delta, persistence: Arc) -> Self { + pub(crate) fn new(id: DocId, delta: Delta, persistence: Arc, ws_sender: Arc) -> Self { + let document = RwLock::new(Document::from_delta(delta)); Self { id, - document: RwLock::new(Document::from_delta(delta)), + document, + ws_sender, persistence, } } pub(crate) fn data(&self) -> Vec { self.document.read().to_bytes() } - pub(crate) fn apply_delta(&self, data: Vec, pool: Arc) -> Result<(), DocError> { + pub(crate) fn apply_delta(&self, data: Bytes, pool: Arc) -> Result<(), DocError> { let mut write_guard = self.document.write(); - let _ = write_guard.apply_changeset(data)?; + let _ = write_guard.apply_changeset(data.clone())?; + + self.ws_sender.send_data(data); // Opti: strategy to save the document let mut save = SaveDocParams { diff --git a/rust-lib/flowy-document/src/services/ws/ws_manager.rs b/rust-lib/flowy-document/src/services/ws/ws_manager.rs index d42c8c6368..1af17bb1fe 100644 --- a/rust-lib/flowy-document/src/services/ws/ws_manager.rs +++ b/rust-lib/flowy-document/src/services/ws/ws_manager.rs @@ -12,12 +12,12 @@ lazy_static! { } pub struct WsManager { - sender: Box, + pub(crate) sender: Arc, doc_handlers: HashMap>, } impl WsManager { - pub fn new(sender: Box) -> Self { + pub fn new(sender: Arc) -> Self { Self { sender, doc_handlers: HashMap::new(), @@ -45,9 +45,9 @@ impl WsManager { }, } } + pub fn send_data(&self, data: WsDocumentData) { let bytes: Bytes = data.try_into().unwrap(); - match self.sender.send_data(bytes) { Ok(_) => {}, Err(e) => { diff --git a/rust-lib/flowy-ot/Cargo.toml b/rust-lib/flowy-ot/Cargo.toml index 019919c9b2..d511bb9385 100644 --- a/rust-lib/flowy-ot/Cargo.toml +++ b/rust-lib/flowy-ot/Cargo.toml @@ -17,6 +17,7 @@ lazy_static = "1.4.0" url = "2.2" strum = "0.21" strum_macros = "0.21" +bytes = "1.0" [dev-dependencies] diff --git a/rust-lib/flowy-ot/src/core/delta/delta.rs b/rust-lib/flowy-ot/src/core/delta/delta.rs index c31f30150f..dd35fa18c8 100644 --- a/rust-lib/flowy-ot/src/core/delta/delta.rs +++ b/rust-lib/flowy-ot/src/core/delta/delta.rs @@ -3,6 +3,8 @@ use crate::{ errors::{ErrorBuilder, OTError, OTErrorCode}, }; use bytecount::num_chars; +use bytes::Bytes; +use serde::__private::TryFrom; use std::{ cmp::{min, Ordering}, fmt, @@ -44,6 +46,15 @@ impl std::convert::TryFrom> for Delta { fn try_from(bytes: Vec) -> Result { Delta::from_bytes(bytes) } } +impl std::convert::TryFrom for Delta { + type Error = OTError; + + fn try_from(value: Bytes) -> Result { + let bytes = value.to_vec(); + Delta::from_bytes(bytes) + } +} + // impl>> std::convert::From for Delta { // fn from(bytes: T) -> Self { // Delta::from_bytes(bytes.as_ref().to_vec()).unwrap() } } diff --git a/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs b/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs index 62a856dc7c..249aea34f0 100644 --- a/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs +++ b/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs @@ -22,7 +22,7 @@ impl DocumentDepsResolver { user: self.user_session.clone(), }); - let sender = Box::new(WsSenderImpl { + let sender = Arc::new(WsSenderImpl { user: self.user_session.clone(), });