From 23c4924532aab1439c39065272304026b463359c Mon Sep 17 00:00:00 2001 From: appflowy Date: Sat, 22 Jan 2022 22:41:24 +0800 Subject: [PATCH] config folder sync on backend --- backend/src/context.rs | 47 ++++-- backend/src/services/document/persistence.rs | 148 ++---------------- backend/src/services/document/router.rs | 4 +- backend/src/services/document/ws_receiver.rs | 51 +++--- backend/src/services/folder/trash/router.rs | 2 +- backend/src/services/folder/trash/trash.rs | 30 ++-- .../src/services/folder/view/controller.rs | 15 +- backend/src/services/folder/view/router.rs | 4 +- backend/src/services/folder/ws_actor.rs | 19 ++- backend/src/services/folder/ws_receiver.rs | 113 ++++++++++++- backend/src/services/kv/mod.rs | 1 + backend/src/services/kv/revision_kv.rs | 120 ++++++++++++++ backend/tests/api_test/kv_test.rs | 8 +- backend/tests/document_test/edit_script.rs | 2 +- .../infrastructure/repos/trash_repo.dart | 4 +- .../flowy_sdk/lib/dispatch/dispatch.dart | 1 + shared-lib/backend-service/src/errors.rs | 10 +- 17 files changed, 360 insertions(+), 219 deletions(-) create mode 100644 backend/src/services/kv/revision_kv.rs diff --git a/backend/src/context.rs b/backend/src/context.rs index 3a72c20e54..74a2085f0c 100644 --- a/backend/src/context.rs +++ b/backend/src/context.rs @@ -6,13 +6,11 @@ use actix::Addr; use actix_web::web::Data; use crate::services::{ - document::{ - persistence::DocumentKVPersistence, - ws_receiver::{make_document_ws_receiver, HttpDocumentCloudPersistence}, - }, - folder::ws_receiver::make_folder_ws_receiver, + document::ws_receiver::{make_document_ws_receiver, HttpDocumentCloudPersistence}, + folder::ws_receiver::{make_folder_ws_receiver, HttpFolderCloudPersistence}, + kv::revision_kv::RevisionKVPersistence, }; -use flowy_collaboration::server_document::ServerDocumentManager; +use flowy_collaboration::{server_document::ServerDocumentManager, server_folder::ServerFolderManager}; use lib_ws::WSChannel; use sqlx::PgPool; use std::sync::Arc; @@ -23,6 +21,7 @@ pub struct AppContext { pub persistence: Data>, pub ws_receivers: Data, pub document_manager: Data>, + pub folder_manager: Data>, } impl AppContext { @@ -30,16 +29,22 @@ impl AppContext { let ws_server = Data::new(ws_server); let mut ws_receivers = WebSocketReceivers::new(); - let kv_store = make_document_kv_store(pg_pool.clone()); - let flowy_persistence = Arc::new(FlowyPersistence { pg_pool, kv_store }); + let document_store = make_document_kv_store(pg_pool.clone()); + let folder_store = make_folder_kv_store(pg_pool.clone()); + let flowy_persistence = Arc::new(FlowyPersistence { + pg_pool, + document_store, + folder_store, + }); - let document_persistence = Arc::new(HttpDocumentCloudPersistence(flowy_persistence.kv_store())); + let document_persistence = Arc::new(HttpDocumentCloudPersistence(flowy_persistence.document_kv_store())); let document_manager = Arc::new(ServerDocumentManager::new(document_persistence)); - let document_ws_receiver = make_document_ws_receiver(flowy_persistence.clone(), document_manager.clone()); ws_receivers.set(WSChannel::Document, document_ws_receiver); - let folder_ws_receiver = make_folder_ws_receiver(flowy_persistence.clone()); + let folder_persistence = Arc::new(HttpFolderCloudPersistence(flowy_persistence.folder_kv_store())); + let folder_manager = Arc::new(ServerFolderManager::new(folder_persistence)); + let folder_ws_receiver = make_folder_ws_receiver(flowy_persistence.clone(), folder_manager.clone()); ws_receivers.set(WSChannel::Folder, folder_ws_receiver); AppContext { @@ -47,23 +52,35 @@ impl AppContext { persistence: Data::new(flowy_persistence), ws_receivers: Data::new(ws_receivers), document_manager: Data::new(document_manager), + folder_manager: Data::new(folder_manager), } } } -fn make_document_kv_store(pg_pool: PgPool) -> Arc { +pub type DocumentRevisionKV = RevisionKVPersistence; +pub type FolderRevisionKV = RevisionKVPersistence; + +fn make_document_kv_store(pg_pool: PgPool) -> Arc { let kv_impl = Arc::new(PostgresKV { pg_pool }); - Arc::new(DocumentKVPersistence::new(kv_impl)) + Arc::new(DocumentRevisionKV::new(kv_impl)) +} + +fn make_folder_kv_store(pg_pool: PgPool) -> Arc { + let kv_impl = Arc::new(PostgresKV { pg_pool }); + Arc::new(FolderRevisionKV::new(kv_impl)) } #[derive(Clone)] pub struct FlowyPersistence { pg_pool: PgPool, - kv_store: Arc, + document_store: Arc, + folder_store: Arc, } impl FlowyPersistence { pub fn pg_pool(&self) -> PgPool { self.pg_pool.clone() } - pub fn kv_store(&self) -> Arc { self.kv_store.clone() } + pub fn document_kv_store(&self) -> Arc { self.document_store.clone() } + + pub fn folder_kv_store(&self) -> Arc { self.folder_store.clone() } } diff --git a/backend/src/services/document/persistence.rs b/backend/src/services/document/persistence.rs index 5bb0de32a4..895a8e3d92 100644 --- a/backend/src/services/document/persistence.rs +++ b/backend/src/services/document/persistence.rs @@ -1,44 +1,33 @@ -use crate::{ - services::kv::{KVStore, KeyValue}, - util::serde_ext::parse_from_bytes, -}; use anyhow::Context; use backend_service::errors::{internal_error, ServerError}; -use bytes::Bytes; + use flowy_collaboration::{ - protobuf::{ - CreateDocParams, - DocumentId, - DocumentInfo, - RepeatedRevision as RepeatedRevisionPB, - ResetDocumentParams, - Revision as RevisionPB, - }, + protobuf::{CreateDocParams, DocumentId, DocumentInfo, ResetDocumentParams}, server_document::ServerDocumentManager, util::make_document_info_pb_from_revisions_pb, }; -use protobuf::Message; +use crate::services::kv::revision_kv::RevisionKVPersistence; use std::sync::Arc; use uuid::Uuid; -#[tracing::instrument(level = "debug", skip(kv_store, params), err)] +#[tracing::instrument(level = "debug", skip(document_store, params), err)] pub(crate) async fn create_document( - kv_store: &Arc, + document_store: &Arc, mut params: CreateDocParams, ) -> Result<(), ServerError> { let revisions = params.take_revisions().take_items(); - let _ = kv_store.set_revision(revisions.into()).await?; + let _ = document_store.set_revision(revisions.into()).await?; Ok(()) } -#[tracing::instrument(level = "debug", skip(kv_store), err)] +#[tracing::instrument(level = "debug", skip(document_store), err)] pub async fn read_document( - kv_store: &Arc, + document_store: &Arc, params: DocumentId, ) -> Result { let _ = Uuid::parse_str(¶ms.doc_id).context("Parse document id to uuid failed")?; - let revisions = kv_store.get_revisions(¶ms.doc_id, None).await?; + let revisions = document_store.get_revisions(¶ms.doc_id, None).await?; match make_document_info_pb_from_revisions_pb(¶ms.doc_id, revisions) { Ok(Some(document_info)) => Ok(document_info), Ok(None) => Err(ServerError::record_not_found().context(format!("{} not exist", params.doc_id))), @@ -63,119 +52,12 @@ pub async fn reset_document( Ok(()) } -#[tracing::instrument(level = "debug", skip(kv_store), err)] -pub(crate) async fn delete_document(kv_store: &Arc, doc_id: Uuid) -> Result<(), ServerError> { +#[tracing::instrument(level = "debug", skip(document_store), err)] +pub(crate) async fn delete_document( + document_store: &Arc, + doc_id: Uuid, +) -> Result<(), ServerError> { // TODO: delete revisions may cause time issue. Maybe delete asynchronously? - let _ = kv_store.delete_revisions(&doc_id.to_string(), None).await?; + let _ = document_store.delete_revisions(&doc_id.to_string(), None).await?; Ok(()) } - -pub struct DocumentKVPersistence { - inner: Arc, -} - -impl std::ops::Deref for DocumentKVPersistence { - type Target = Arc; - - fn deref(&self) -> &Self::Target { &self.inner } -} - -impl std::ops::DerefMut for DocumentKVPersistence { - fn deref_mut(&mut self) -> &mut Self::Target { &mut self.inner } -} - -impl DocumentKVPersistence { - pub(crate) fn new(kv_store: Arc) -> Self { DocumentKVPersistence { inner: kv_store } } - - pub(crate) async fn set_revision(&self, revisions: Vec) -> Result<(), ServerError> { - let items = revisions_to_key_value_items(revisions)?; - self.inner - .transaction(|mut t| Box::pin(async move { t.batch_set(items).await })) - .await - } - - pub(crate) async fn get_revisions>>>( - &self, - doc_id: &str, - rev_ids: T, - ) -> Result { - let rev_ids = rev_ids.into(); - let items = match rev_ids { - None => { - let doc_id = doc_id.to_owned(); - self.inner - .transaction(|mut t| Box::pin(async move { t.batch_get_start_with(&doc_id).await })) - .await? - }, - Some(rev_ids) => { - let keys = rev_ids - .into_iter() - .map(|rev_id| make_revision_key(doc_id, rev_id)) - .collect::>(); - - self.inner - .transaction(|mut t| Box::pin(async move { t.batch_get(keys).await })) - .await? - }, - }; - - Ok(key_value_items_to_revisions(items)) - } - - pub(crate) async fn delete_revisions>>>( - &self, - doc_id: &str, - rev_ids: T, - ) -> Result<(), ServerError> { - match rev_ids.into() { - None => { - let doc_id = doc_id.to_owned(); - self.inner - .transaction(|mut t| Box::pin(async move { t.batch_delete_key_start_with(&doc_id).await })) - .await - }, - Some(rev_ids) => { - let keys = rev_ids - .into_iter() - .map(|rev_id| make_revision_key(doc_id, rev_id)) - .collect::>(); - - self.inner - .transaction(|mut t| Box::pin(async move { t.batch_delete(keys).await })) - .await - }, - } - } -} - -#[inline] -pub fn revisions_to_key_value_items(revisions: Vec) -> Result, ServerError> { - let mut items = vec![]; - for revision in revisions { - let key = make_revision_key(&revision.object_id, revision.rev_id); - - if revision.delta_data.is_empty() { - return Err(ServerError::internal().context("The delta_data of RevisionPB should not be empty")); - } - - let value = Bytes::from(revision.write_to_bytes().unwrap()); - items.push(KeyValue { key, value }); - } - Ok(items) -} - -#[inline] -fn key_value_items_to_revisions(items: Vec) -> RepeatedRevisionPB { - let mut revisions = items - .into_iter() - .filter_map(|kv| parse_from_bytes::(&kv.value).ok()) - .collect::>(); - - revisions.sort_by(|a, b| a.rev_id.cmp(&b.rev_id)); - let mut repeated_revision = RepeatedRevisionPB::new(); - repeated_revision.set_items(revisions.into()); - repeated_revision -} - -#[inline] -fn make_revision_key(doc_id: &str, rev_id: i64) -> String { format!("{}:{}", doc_id, rev_id) } diff --git a/backend/src/services/document/router.rs b/backend/src/services/document/router.rs index 2bdd6c88ea..07ddf5ab5c 100644 --- a/backend/src/services/document/router.rs +++ b/backend/src/services/document/router.rs @@ -23,7 +23,7 @@ pub async fn create_document_handler( persistence: Data>, ) -> Result { let params: CreateDocParamsPB = parse_from_payload(payload).await?; - let kv_store = persistence.kv_store(); + let kv_store = persistence.document_kv_store(); let _ = create_document(&kv_store, params).await?; Ok(FlowyResponse::success().into()) } @@ -34,7 +34,7 @@ pub async fn read_document_handler( persistence: Data>, ) -> Result { let params: DocumentIdPB = parse_from_payload(payload).await?; - let kv_store = persistence.kv_store(); + let kv_store = persistence.document_kv_store(); let doc = read_document(&kv_store, params).await?; let response = FlowyResponse::success().pb(doc)?; Ok(response.into()) diff --git a/backend/src/services/document/ws_receiver.rs b/backend/src/services/document/ws_receiver.rs index a536e1ceba..52ae6ee076 100644 --- a/backend/src/services/document/ws_receiver.rs +++ b/backend/src/services/document/ws_receiver.rs @@ -1,10 +1,11 @@ use crate::{ - context::FlowyPersistence, + context::{DocumentRevisionKV, FlowyPersistence}, services::{ document::{ - persistence::{create_document, read_document, revisions_to_key_value_items, DocumentKVPersistence}, + persistence::{create_document, read_document}, ws_actor::{DocumentWSActorMessage, DocumentWebSocketActor}, }, + kv::revision_kv::revisions_to_key_value_items, web_socket::{WSClientData, WebSocketReceiver}, }, }; @@ -76,9 +77,9 @@ impl WebSocketReceiver for DocumentWebSocketReceiver { } } -pub struct HttpDocumentCloudPersistence(pub Arc); +pub struct HttpDocumentCloudPersistence(pub Arc); impl Debug for HttpDocumentCloudPersistence { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("DocumentPersistenceImpl") } + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("HttpDocumentCloudPersistence") } } impl DocumentCloudPersistence for HttpDocumentCloudPersistence { @@ -87,11 +88,11 @@ impl DocumentCloudPersistence for HttpDocumentCloudPersistence { doc_id: doc_id.to_string(), ..Default::default() }; - let kv_store = self.0.clone(); + let document_store = self.0.clone(); Box::pin(async move { - let mut pb_doc = read_document(&kv_store, params) + let mut pb_doc = read_document(&document_store, params) .await - .map_err(server_error_to_collaborate_error)?; + .map_err(|e| e.to_collaborate_error())?; let doc = (&mut pb_doc) .try_into() .map_err(|e| CollaborateError::internal().context(e))?; @@ -104,7 +105,7 @@ impl DocumentCloudPersistence for HttpDocumentCloudPersistence { doc_id: &str, repeated_revision: RepeatedRevisionPB, ) -> BoxResultFuture, CollaborateError> { - let kv_store = self.0.clone(); + let document_store = self.0.clone(); let doc_id = doc_id.to_owned(); Box::pin(async move { let document_info = make_document_info_from_revisions_pb(&doc_id, repeated_revision.clone())?; @@ -112,9 +113,9 @@ impl DocumentCloudPersistence for HttpDocumentCloudPersistence { let mut params = CreateDocParamsPB::new(); params.set_id(doc_id); params.set_revisions(repeated_revision); - let _ = create_document(&kv_store, params) + let _ = create_document(&document_store, params) .await - .map_err(server_error_to_collaborate_error)?; + .map_err(|e| e.to_collaborate_error())?; Ok(document_info) }) } @@ -124,28 +125,28 @@ impl DocumentCloudPersistence for HttpDocumentCloudPersistence { doc_id: &str, rev_ids: Option>, ) -> BoxResultFuture, CollaborateError> { - let kv_store = self.0.clone(); + let document_store = self.0.clone(); let doc_id = doc_id.to_owned(); let f = || async move { - let mut repeated_revision = kv_store.get_revisions(&doc_id, rev_ids).await?; - Ok(repeated_revision.take_items().into()) + let mut repeated_revision = document_store.get_revisions(&doc_id, rev_ids).await?; + Ok::, ServerError>(repeated_revision.take_items().into()) }; - Box::pin(async move { f().await.map_err(server_error_to_collaborate_error) }) + Box::pin(async move { f().await.map_err(|e| e.to_collaborate_error()) }) } fn save_document_revisions( &self, mut repeated_revision: RepeatedRevisionPB, ) -> BoxResultFuture<(), CollaborateError> { - let kv_store = self.0.clone(); + let document_store = self.0.clone(); let f = || async move { let revisions = repeated_revision.take_items().into(); - let _ = kv_store.set_revision(revisions).await?; - Ok(()) + let _ = document_store.set_revision(revisions).await?; + Ok::<(), ServerError>(()) }; - Box::pin(async move { f().await.map_err(server_error_to_collaborate_error) }) + Box::pin(async move { f().await.map_err(|e| e.to_collaborate_error()) }) } fn reset_document( @@ -153,10 +154,10 @@ impl DocumentCloudPersistence for HttpDocumentCloudPersistence { doc_id: &str, mut repeated_revision: RepeatedRevisionPB, ) -> BoxResultFuture<(), CollaborateError> { - let kv_store = self.0.clone(); + let document_store = self.0.clone(); let doc_id = doc_id.to_owned(); let f = || async move { - kv_store + document_store .transaction(|mut transaction| { Box::pin(async move { let _ = transaction.batch_delete_key_start_with(&doc_id).await?; @@ -167,14 +168,6 @@ impl DocumentCloudPersistence for HttpDocumentCloudPersistence { }) .await }; - Box::pin(async move { f().await.map_err(server_error_to_collaborate_error) }) - } -} - -fn server_error_to_collaborate_error(error: ServerError) -> CollaborateError { - if error.is_record_not_found() { - CollaborateError::record_not_found() - } else { - CollaborateError::internal().context(error) + Box::pin(async move { f().await.map_err(|e| e.to_collaborate_error()) }) } } diff --git a/backend/src/services/folder/trash/router.rs b/backend/src/services/folder/trash/router.rs index 155bb3f6a3..b19e9e1125 100644 --- a/backend/src/services/folder/trash/router.rs +++ b/backend/src/services/folder/trash/router.rs @@ -48,7 +48,7 @@ pub async fn delete_handler( logged_user: LoggedUser, ) -> Result { let pool = persistence.pg_pool(); - let kv_store = persistence.kv_store(); + let kv_store = persistence.document_kv_store(); let params: RepeatedTrashId = parse_from_payload(payload).await?; let mut transaction = pool .begin() diff --git a/backend/src/services/folder/trash/trash.rs b/backend/src/services/folder/trash/trash.rs index 713b7be79e..e0bdab571f 100644 --- a/backend/src/services/folder/trash/trash.rs +++ b/backend/src/services/folder/trash/trash.rs @@ -1,12 +1,10 @@ use crate::{ + context::DocumentRevisionKV, entities::logged_user::LoggedUser, - services::{ - document::persistence::DocumentKVPersistence, - folder::{ - app::controller::{delete_app, read_app_table}, - trash::persistence::{TrashTable, TRASH_TABLE}, - view::{delete_view, read_view_table}, - }, + services::folder::{ + app::controller::{delete_app, read_app_table}, + trash::persistence::{TrashTable, TRASH_TABLE}, + view::{delete_view, read_view_table}, }, util::sqlx_ext::{map_sqlx_error, DBTransaction, SqlBuilder}, }; @@ -39,10 +37,10 @@ pub(crate) async fn create_trash( Ok(()) } -#[tracing::instrument(skip(transaction, kv_store, user), fields(delete_rows), err)] +#[tracing::instrument(skip(transaction, document_store, user), fields(delete_rows), err)] pub(crate) async fn delete_all_trash( transaction: &mut DBTransaction<'_>, - kv_store: &Arc, + document_store: &Arc, user: &LoggedUser, ) -> Result<(), ServerError> { let (sql, args) = SqlBuilder::select(TRASH_TABLE) @@ -57,7 +55,7 @@ pub(crate) async fn delete_all_trash( .collect::>(); tracing::Span::current().record("delete_rows", &format!("{:?}", rows).as_str()); let affected_row_count = rows.len(); - let _ = delete_trash_associate_targets(transaction as &mut DBTransaction<'_>, kv_store, rows).await?; + let _ = delete_trash_associate_targets(transaction as &mut DBTransaction<'_>, document_store, rows).await?; let (sql, args) = SqlBuilder::delete(TRASH_TABLE) .and_where_eq("user_id", &user.user_id) @@ -72,10 +70,10 @@ pub(crate) async fn delete_all_trash( Ok(()) } -#[tracing::instrument(skip(transaction, kv_store), err)] +#[tracing::instrument(skip(transaction, document_store), err)] pub(crate) async fn delete_trash( transaction: &mut DBTransaction<'_>, - kv_store: &Arc, + document_store: &Arc, records: Vec<(Uuid, i32)>, ) -> Result<(), ServerError> { for (trash_id, _) in records { @@ -92,7 +90,7 @@ pub(crate) async fn delete_trash( let _ = delete_trash_associate_targets( transaction as &mut DBTransaction<'_>, - kv_store, + document_store, vec![(trash_table.id, trash_table.ty)], ) .await?; @@ -107,10 +105,10 @@ pub(crate) async fn delete_trash( Ok(()) } -#[tracing::instrument(skip(transaction, kv_store, targets), err)] +#[tracing::instrument(skip(transaction, document_store, targets), err)] async fn delete_trash_associate_targets( transaction: &mut DBTransaction<'_>, - kv_store: &Arc, + document_store: &Arc, targets: Vec<(Uuid, i32)>, ) -> Result<(), ServerError> { for (id, ty) in targets { @@ -119,7 +117,7 @@ async fn delete_trash_associate_targets( Some(ty) => match ty { TrashType::Unknown => {}, TrashType::View => { - let _ = delete_view(transaction as &mut DBTransaction<'_>, kv_store, vec![id]).await; + let _ = delete_view(transaction as &mut DBTransaction<'_>, document_store, vec![id]).await; }, TrashType::App => { let _ = delete_app(transaction as &mut DBTransaction<'_>, id).await; diff --git a/backend/src/services/folder/view/controller.rs b/backend/src/services/folder/view/controller.rs index 9bd18c874f..a5f277879d 100644 --- a/backend/src/services/folder/view/controller.rs +++ b/backend/src/services/folder/view/controller.rs @@ -1,13 +1,14 @@ use crate::{ entities::logged_user::LoggedUser, services::{ - document::persistence::{create_document, delete_document, DocumentKVPersistence}, + document::persistence::{create_document, delete_document}, folder::{trash::read_trash_ids, view::persistence::*}, }, util::sqlx_ext::{map_sqlx_error, DBTransaction, SqlBuilder}, }; use backend_service::errors::{invalid_params, ServerError}; +use crate::context::DocumentRevisionKV; use chrono::Utc; use flowy_collaboration::{ client_document::default::initial_delta, @@ -48,10 +49,10 @@ pub(crate) async fn update_view( Ok(()) } -#[tracing::instrument(skip(transaction, kv_store), err)] +#[tracing::instrument(skip(transaction, document_store), err)] pub(crate) async fn delete_view( transaction: &mut DBTransaction<'_>, - kv_store: &Arc, + document_store: &Arc, view_ids: Vec, ) -> Result<(), ServerError> { for view_id in view_ids { @@ -61,15 +62,15 @@ pub(crate) async fn delete_view( .await .map_err(map_sqlx_error)?; - let _ = delete_document(kv_store, view_id).await?; + let _ = delete_document(document_store, view_id).await?; } Ok(()) } -#[tracing::instrument(name = "create_view", level = "debug", skip(transaction, kv_store), err)] +#[tracing::instrument(name = "create_view", level = "debug", skip(transaction, document_store), err)] pub(crate) async fn create_view( transaction: &mut DBTransaction<'_>, - kv_store: Arc, + document_store: Arc, params: CreateViewParamsPB, user_id: &str, ) -> Result { @@ -98,7 +99,7 @@ pub(crate) async fn create_view( let mut create_doc_params = CreateDocParamsPB::new(); create_doc_params.set_revisions(repeated_revision.try_into().unwrap()); create_doc_params.set_id(view.id.clone()); - let _ = create_document(&kv_store, create_doc_params).await?; + let _ = create_document(&document_store, create_doc_params).await?; Ok(view) } diff --git a/backend/src/services/folder/view/router.rs b/backend/src/services/folder/view/router.rs index a62d789924..fd53616dbc 100644 --- a/backend/src/services/folder/view/router.rs +++ b/backend/src/services/folder/view/router.rs @@ -37,7 +37,7 @@ pub async fn create_handler( user: LoggedUser, ) -> Result { let params: CreateViewParamsPB = parse_from_payload(payload).await?; - let kv_store = persistence.kv_store(); + let kv_store = persistence.document_kv_store(); let pool = persistence.pg_pool(); let mut transaction = pool .begin() @@ -114,7 +114,7 @@ pub async fn delete_handler( ) -> Result { let params: QueryViewRequestPB = parse_from_payload(payload).await?; let pool = persistence.pg_pool(); - let kv_store = persistence.kv_store(); + let kv_store = persistence.document_kv_store(); let view_ids = check_view_ids(params.view_ids.to_vec())?; let mut transaction = pool .begin() diff --git a/backend/src/services/folder/ws_actor.rs b/backend/src/services/folder/ws_actor.rs index 63d23b6420..a09723b4fd 100644 --- a/backend/src/services/folder/ws_actor.rs +++ b/backend/src/services/folder/ws_actor.rs @@ -12,6 +12,7 @@ use flowy_collaboration::{ ClientRevisionWSData as ClientRevisionWSDataPB, ClientRevisionWSDataType as ClientRevisionWSDataTypePB, }, + server_folder::ServerFolderManager, synchronizer::{RevisionSyncResponse, RevisionUser}, }; use futures::stream::StreamExt; @@ -29,12 +30,14 @@ pub enum FolderWSActorMessage { pub struct FolderWebSocketActor { receiver: Option>, + folder_manager: Arc, } impl FolderWebSocketActor { - pub fn new(receiver: mpsc::Receiver) -> Self { + pub fn new(receiver: mpsc::Receiver, folder_manager: Arc) -> Self { Self { receiver: Some(receiver), + folder_manager, } } @@ -79,13 +82,21 @@ impl FolderWebSocketActor { folder_client_data.ty ); - let _user = Arc::new(FolderRevisionUser { user, socket }); + let user = Arc::new(FolderRevisionUser { user, socket }); match &folder_client_data.ty { ClientRevisionWSDataTypePB::ClientPushRev => { - todo!() + let _ = self + .folder_manager + .handle_client_revisions(user, folder_client_data) + .await + .map_err(internal_error)?; }, ClientRevisionWSDataTypePB::ClientPing => { - todo!() + let _ = self + .folder_manager + .handle_client_ping(user, folder_client_data) + .await + .map_err(internal_error)?; }, } Ok(()) diff --git a/backend/src/services/folder/ws_receiver.rs b/backend/src/services/folder/ws_receiver.rs index 282e1420dd..a1dc95bbaf 100644 --- a/backend/src/services/folder/ws_receiver.rs +++ b/backend/src/services/folder/ws_receiver.rs @@ -5,13 +5,26 @@ use crate::{ web_socket::{WSClientData, WebSocketReceiver}, }, }; +use std::fmt::{Debug, Formatter}; +use crate::{context::FolderRevisionKV, services::kv::revision_kv::revisions_to_key_value_items}; +use flowy_collaboration::{ + entities::folder_info::FolderInfo, + errors::CollaborateError, + protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB}, + server_folder::{FolderCloudPersistence, ServerFolderManager}, + util::make_folder_from_revisions_pb, +}; +use lib_infra::future::BoxResultFuture; use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; -pub fn make_folder_ws_receiver(persistence: Arc) -> Arc { +pub fn make_folder_ws_receiver( + persistence: Arc, + folder_manager: Arc, +) -> Arc { let (ws_sender, rx) = tokio::sync::mpsc::channel(1000); - let actor = FolderWebSocketActor::new(rx); + let actor = FolderWebSocketActor::new(rx, folder_manager); tokio::task::spawn(actor.run()); Arc::new(FolderWebSocketReceiver::new(persistence, ws_sender)) } @@ -51,3 +64,99 @@ impl WebSocketReceiver for FolderWebSocketReceiver { }); } } + +pub struct HttpFolderCloudPersistence(pub Arc); +impl Debug for HttpFolderCloudPersistence { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("HttpFolderCloudPersistence") } +} + +impl FolderCloudPersistence for HttpFolderCloudPersistence { + fn read_folder(&self, _user_id: &str, folder_id: &str) -> BoxResultFuture { + let folder_store = self.0.clone(); + let folder_id = folder_id.to_owned(); + Box::pin(async move { + let revisions = folder_store + .get_revisions(&folder_id, None) + .await + .map_err(|e| e.to_collaborate_error())?; + match make_folder_from_revisions_pb(&folder_id, revisions)? { + Some(folder_info) => Ok(folder_info), + None => Err(CollaborateError::record_not_found().context(format!("{} not exist", folder_id))), + } + }) + } + + fn create_folder( + &self, + _user_id: &str, + folder_id: &str, + mut repeated_revision: RepeatedRevisionPB, + ) -> BoxResultFuture, CollaborateError> { + let folder_store = self.0.clone(); + let folder_id = folder_id.to_owned(); + Box::pin(async move { + let folder_info = make_folder_from_revisions_pb(&folder_id, repeated_revision.clone())?; + let revisions: Vec = repeated_revision.take_items().into(); + let _ = folder_store + .set_revision(revisions) + .await + .map_err(|e| e.to_collaborate_error())?; + Ok(folder_info) + }) + } + + fn save_folder_revisions( + &self, + mut repeated_revision: RepeatedRevisionPB, + ) -> BoxResultFuture<(), CollaborateError> { + let folder_store = self.0.clone(); + Box::pin(async move { + let revisions: Vec = repeated_revision.take_items().into(); + let _ = folder_store + .set_revision(revisions) + .await + .map_err(|e| e.to_collaborate_error())?; + Ok(()) + }) + } + + fn read_folder_revisions( + &self, + folder_id: &str, + rev_ids: Option>, + ) -> BoxResultFuture, CollaborateError> { + let folder_store = self.0.clone(); + let folder_id = folder_id.to_owned(); + Box::pin(async move { + let mut repeated_revision = folder_store + .get_revisions(&folder_id, rev_ids) + .await + .map_err(|e| e.to_collaborate_error())?; + let revisions: Vec = repeated_revision.take_items().into(); + Ok(revisions) + }) + } + + fn reset_folder( + &self, + folder_id: &str, + mut repeated_revision: RepeatedRevisionPB, + ) -> BoxResultFuture<(), CollaborateError> { + let folder_store = self.0.clone(); + let folder_id = folder_id.to_owned(); + Box::pin(async move { + let _ = folder_store + .transaction(|mut transaction| { + Box::pin(async move { + let _ = transaction.batch_delete_key_start_with(&folder_id).await?; + let items = revisions_to_key_value_items(repeated_revision.take_items().into())?; + let _ = transaction.batch_set(items).await?; + Ok(()) + }) + }) + .await + .map_err(|e| e.to_collaborate_error())?; + Ok(()) + }) + } +} diff --git a/backend/src/services/kv/mod.rs b/backend/src/services/kv/mod.rs index 540e693ea1..4a14383b61 100644 --- a/backend/src/services/kv/mod.rs +++ b/backend/src/services/kv/mod.rs @@ -1,5 +1,6 @@ #![allow(clippy::module_inception)] mod kv; +pub mod revision_kv; use async_trait::async_trait; use bytes::Bytes; diff --git a/backend/src/services/kv/revision_kv.rs b/backend/src/services/kv/revision_kv.rs new file mode 100644 index 0000000000..dff6b85ab9 --- /dev/null +++ b/backend/src/services/kv/revision_kv.rs @@ -0,0 +1,120 @@ +use crate::{ + services::kv::{KVStore, KeyValue}, + util::serde_ext::parse_from_bytes, +}; +use backend_service::errors::ServerError; +use bytes::Bytes; +use flowy_collaboration::protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB}; + +use protobuf::Message; +use std::sync::Arc; + +pub struct RevisionKVPersistence { + inner: Arc, +} + +impl std::ops::Deref for RevisionKVPersistence { + type Target = Arc; + + fn deref(&self) -> &Self::Target { &self.inner } +} + +impl std::ops::DerefMut for RevisionKVPersistence { + fn deref_mut(&mut self) -> &mut Self::Target { &mut self.inner } +} + +impl RevisionKVPersistence { + pub(crate) fn new(kv_store: Arc) -> Self { RevisionKVPersistence { inner: kv_store } } + + pub(crate) async fn set_revision(&self, revisions: Vec) -> Result<(), ServerError> { + let items = revisions_to_key_value_items(revisions)?; + self.inner + .transaction(|mut t| Box::pin(async move { t.batch_set(items).await })) + .await + } + + pub(crate) async fn get_revisions>>>( + &self, + object_id: &str, + rev_ids: T, + ) -> Result { + let rev_ids = rev_ids.into(); + let items = match rev_ids { + None => { + let object_id = object_id.to_owned(); + self.inner + .transaction(|mut t| Box::pin(async move { t.batch_get_start_with(&object_id).await })) + .await? + }, + Some(rev_ids) => { + let keys = rev_ids + .into_iter() + .map(|rev_id| make_revision_key(object_id, rev_id)) + .collect::>(); + + self.inner + .transaction(|mut t| Box::pin(async move { t.batch_get(keys).await })) + .await? + }, + }; + + Ok(key_value_items_to_revisions(items)) + } + + pub(crate) async fn delete_revisions>>>( + &self, + object_id: &str, + rev_ids: T, + ) -> Result<(), ServerError> { + match rev_ids.into() { + None => { + let object_id = object_id.to_owned(); + self.inner + .transaction(|mut t| Box::pin(async move { t.batch_delete_key_start_with(&object_id).await })) + .await + }, + Some(rev_ids) => { + let keys = rev_ids + .into_iter() + .map(|rev_id| make_revision_key(object_id, rev_id)) + .collect::>(); + + self.inner + .transaction(|mut t| Box::pin(async move { t.batch_delete(keys).await })) + .await + }, + } + } +} + +#[inline] +pub fn revisions_to_key_value_items(revisions: Vec) -> Result, ServerError> { + let mut items = vec![]; + for revision in revisions { + let key = make_revision_key(&revision.object_id, revision.rev_id); + + if revision.delta_data.is_empty() { + return Err(ServerError::internal().context("The delta_data of RevisionPB should not be empty")); + } + + let value = Bytes::from(revision.write_to_bytes().unwrap()); + items.push(KeyValue { key, value }); + } + Ok(items) +} + +#[inline] +fn key_value_items_to_revisions(items: Vec) -> RepeatedRevisionPB { + let mut revisions = items + .into_iter() + .filter_map(|kv| parse_from_bytes::(&kv.value).ok()) + .collect::>(); + + revisions.sort_by(|a, b| a.rev_id.cmp(&b.rev_id)); + let mut repeated_revision = RepeatedRevisionPB::new(); + repeated_revision.set_items(revisions.into()); + repeated_revision +} + +#[inline] +fn make_revision_key(object_id: &str, rev_id: i64) -> String { format!("{}:{}", object_id, rev_id) } diff --git a/backend/tests/api_test/kv_test.rs b/backend/tests/api_test/kv_test.rs index 302fd5681e..3765270ef3 100644 --- a/backend/tests/api_test/kv_test.rs +++ b/backend/tests/api_test/kv_test.rs @@ -5,7 +5,7 @@ use std::str; #[actix_rt::test] async fn kv_set_test() { let server = spawn_server().await; - let kv = server.app_ctx.persistence.kv_store(); + let kv = server.app_ctx.persistence.document_kv_store(); let s1 = "123".to_string(); let key = "1"; @@ -18,7 +18,7 @@ async fn kv_set_test() { #[actix_rt::test] async fn kv_delete_test() { let server = spawn_server().await; - let kv = server.app_ctx.persistence.kv_store(); + let kv = server.app_ctx.persistence.document_kv_store(); let s1 = "123".to_string(); let key = "1"; @@ -30,7 +30,7 @@ async fn kv_delete_test() { #[actix_rt::test] async fn kv_batch_set_test() { let server = spawn_server().await; - let kv = server.app_ctx.persistence.kv_store(); + let kv = server.app_ctx.persistence.document_kv_store(); let kvs = vec![ KeyValue { key: "1".to_string(), @@ -54,7 +54,7 @@ async fn kv_batch_set_test() { #[actix_rt::test] async fn kv_batch_get_start_with_test() { let server = spawn_server().await; - let kv = server.app_ctx.persistence.kv_store(); + let kv = server.app_ctx.persistence.document_kv_store(); let kvs = vec![ KeyValue { key: "abc:1".to_string(), diff --git a/backend/tests/document_test/edit_script.rs b/backend/tests/document_test/edit_script.rs index 8c50368d9c..d0b7a05bf5 100644 --- a/backend/tests/document_test/edit_script.rs +++ b/backend/tests/document_test/edit_script.rs @@ -113,7 +113,7 @@ async fn run_scripts(context: Arc>, scripts: Vec { sleep(Duration::from_millis(2000)).await; - let persistence = Data::new(context.read().server.app_ctx.persistence.kv_store()); + let persistence = Data::new(context.read().server.app_ctx.persistence.document_kv_store()); let doc_identifier: DocumentIdPB = DocumentId { doc_id }.try_into().unwrap(); diff --git a/frontend/app_flowy/lib/workspace/infrastructure/repos/trash_repo.dart b/frontend/app_flowy/lib/workspace/infrastructure/repos/trash_repo.dart index a746901c94..5a40092897 100644 --- a/frontend/app_flowy/lib/workspace/infrastructure/repos/trash_repo.dart +++ b/frontend/app_flowy/lib/workspace/infrastructure/repos/trash_repo.dart @@ -33,11 +33,11 @@ class TrashRepo { } Future> restoreAll() { - return WorkspaceEventRestoreAll().send(); + return WorkspaceEventRestoreAllTrash().send(); } Future> deleteAll() { - return WorkspaceEventDeleteAll().send(); + return WorkspaceEventDeleteAllTrash().send(); } } diff --git a/frontend/app_flowy/packages/flowy_sdk/lib/dispatch/dispatch.dart b/frontend/app_flowy/packages/flowy_sdk/lib/dispatch/dispatch.dart index 1dd5d2728a..a10ba4fafb 100644 --- a/frontend/app_flowy/packages/flowy_sdk/lib/dispatch/dispatch.dart +++ b/frontend/app_flowy/packages/flowy_sdk/lib/dispatch/dispatch.dart @@ -4,6 +4,7 @@ import 'package:flowy_log/flowy_log.dart'; // ignore: unnecessary_import import 'package:flowy_sdk/protobuf/dart-ffi/ffi_response.pb.dart'; import 'package:flowy_sdk/protobuf/dart-ffi/ffi_request.pb.dart'; +import 'package:flowy_sdk/protobuf/flowy-collaboration/document_info.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-net/event.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-net/network_state.pb.dart'; diff --git a/shared-lib/backend-service/src/errors.rs b/shared-lib/backend-service/src/errors.rs index 143e39d559..2b11f111c1 100644 --- a/shared-lib/backend-service/src/errors.rs +++ b/shared-lib/backend-service/src/errors.rs @@ -5,7 +5,7 @@ use serde_repr::*; use std::{fmt, fmt::Debug}; pub type Result = std::result::Result; - +use flowy_collaboration::errors::CollaborateError; #[derive(thiserror::Error, Debug, Serialize, Deserialize, Clone)] pub struct ServerError { pub code: ErrorCode, @@ -47,6 +47,14 @@ impl ServerError { pub fn is_record_not_found(&self) -> bool { self.code == ErrorCode::RecordNotFound } pub fn is_unauthorized(&self) -> bool { self.code == ErrorCode::UserUnauthorized } + + pub fn to_collaborate_error(&self) -> CollaborateError { + if self.is_record_not_found() { + CollaborateError::record_not_found() + } else { + CollaborateError::internal().context(self.msg.clone()) + } + } } pub fn internal_error(e: T) -> ServerError