diff --git a/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs b/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs index 30cd310933..a1554afe4b 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs @@ -4,7 +4,7 @@ use crate::{ services::{ doc::{ edit::{EditCommand, EditCommandQueue, OpenDocAction, TransformDeltas}, - revision::{RevisionDownStream, RevisionManager, SteamStopRx, SteamStopTx}, + revision::{RevisionDownStream, RevisionManager, SteamStopTx}, }, ws::{DocumentWebSocket, WsDocumentHandler}, }, @@ -31,7 +31,7 @@ pub type DocId = String; pub struct ClientDocEditor { pub doc_id: DocId, rev_manager: Arc, - edit_tx: UnboundedSender, + edit_cmd_tx: UnboundedSender, ws_sender: Arc, user: Arc, ws_msg_tx: UnboundedSender, @@ -47,7 +47,7 @@ impl ClientDocEditor { ws_sender: Arc, ) -> DocResult> { let delta = rev_manager.load_document().await?; - let edit_queue_tx = spawn_edit_queue(doc_id, delta, pool.clone()); + let edit_cmd_tx = spawn_edit_queue(doc_id, delta, pool.clone()); let doc_id = doc_id.to_string(); let rev_manager = Arc::new(rev_manager); let (ws_msg_tx, ws_msg_rx) = mpsc::unbounded_channel(); @@ -56,12 +56,13 @@ impl ClientDocEditor { let edit_doc = Arc::new(Self { doc_id, rev_manager, - edit_tx: edit_queue_tx, + edit_cmd_tx, + ws_sender, user, ws_msg_tx, - ws_sender, stop_sync_tx, }); + edit_doc.notify_open_doc(); start_sync(edit_doc.clone(), ws_msg_rx, cloned_stop_sync_tx); @@ -75,7 +76,7 @@ impl ClientDocEditor { data: data.to_string(), ret, }; - let _ = self.edit_tx.send(msg); + let _ = self.edit_cmd_tx.send(msg); let delta = rx.await.map_err(internal_error)??; let _ = self.save_local_delta(delta).await?; Ok(()) @@ -84,7 +85,7 @@ impl ClientDocEditor { pub async fn delete(&self, interval: Interval) -> Result<(), DocError> { let (ret, rx) = oneshot::channel::>(); let msg = EditCommand::Delete { interval, ret }; - let _ = self.edit_tx.send(msg); + let _ = self.edit_cmd_tx.send(msg); let delta = rx.await.map_err(internal_error)??; let _ = self.save_local_delta(delta).await?; Ok(()) @@ -97,7 +98,7 @@ impl ClientDocEditor { attribute, ret, }; - let _ = self.edit_tx.send(msg); + let _ = self.edit_cmd_tx.send(msg); let delta = rx.await.map_err(internal_error)??; let _ = self.save_local_delta(delta).await?; Ok(()) @@ -110,7 +111,7 @@ impl ClientDocEditor { data: data.to_string(), ret, }; - let _ = self.edit_tx.send(msg); + let _ = self.edit_cmd_tx.send(msg); let delta = rx.await.map_err(internal_error)??; let _ = self.save_local_delta(delta).await?; Ok(()) @@ -119,21 +120,21 @@ impl ClientDocEditor { pub async fn can_undo(&self) -> bool { let (ret, rx) = oneshot::channel::(); let msg = EditCommand::CanUndo { ret }; - let _ = self.edit_tx.send(msg); + let _ = self.edit_cmd_tx.send(msg); rx.await.unwrap_or(false) } pub async fn can_redo(&self) -> bool { let (ret, rx) = oneshot::channel::(); let msg = EditCommand::CanRedo { ret }; - let _ = self.edit_tx.send(msg); + let _ = self.edit_cmd_tx.send(msg); rx.await.unwrap_or(false) } pub async fn undo(&self) -> Result { let (ret, rx) = oneshot::channel::>(); let msg = EditCommand::Undo { ret }; - let _ = self.edit_tx.send(msg); + let _ = self.edit_cmd_tx.send(msg); let r = rx.await.map_err(internal_error)??; Ok(r) } @@ -141,7 +142,7 @@ impl ClientDocEditor { pub async fn redo(&self) -> Result { let (ret, rx) = oneshot::channel::>(); let msg = EditCommand::Redo { ret }; - let _ = self.edit_tx.send(msg); + let _ = self.edit_cmd_tx.send(msg); let r = rx.await.map_err(internal_error)??; Ok(r) } @@ -149,7 +150,7 @@ impl ClientDocEditor { pub async fn delta(&self) -> DocResult { let (ret, rx) = oneshot::channel::>(); let msg = EditCommand::ReadDoc { ret }; - let _ = self.edit_tx.send(msg); + let _ = self.edit_cmd_tx.send(msg); let data = rx.await.map_err(internal_error)??; Ok(DocDelta { @@ -175,16 +176,16 @@ impl ClientDocEditor { delta: delta.clone(), ret, }; - let _ = self.edit_tx.send(msg); + let _ = self.edit_cmd_tx.send(msg); let _ = rx.await.map_err(internal_error)??; let _ = self.save_local_delta(delta).await?; Ok(()) } - #[tracing::instrument(level = "debug", skip(self), fields(doc_id))] + #[tracing::instrument(level = "debug", skip(self))] pub fn stop_sync(&self) { - tracing::Span::current().record("doc_id", &self.doc_id.as_str()); + tracing::debug!("{} stop sync", self.doc_id); let _ = self.stop_sync_tx.send(()); } @@ -208,7 +209,7 @@ impl ClientDocEditor { pub(crate) async fn handle_push_rev(&self, bytes: Bytes) -> DocResult<()> { // Transform the revision let (ret, rx) = oneshot::channel::>(); - let _ = self.edit_tx.send(EditCommand::ProcessRemoteRevision { bytes, ret }); + let _ = self.edit_cmd_tx.send(EditCommand::ProcessRemoteRevision { bytes, ret }); let TransformDeltas { client_prime, server_prime, @@ -226,7 +227,7 @@ impl ClientDocEditor { delta: client_prime.clone(), ret, }; - let _ = self.edit_tx.send(msg); + let _ = self.edit_cmd_tx.send(msg); let _ = rx.await.map_err(internal_error)??; // update rev id @@ -256,10 +257,12 @@ impl ClientDocEditor { Ok(()) } - async fn handle_ws_message(&self, doc_data: WsDocumentData) -> DocResult<()> { + pub async fn handle_ws_message(&self, doc_data: WsDocumentData) -> DocResult<()> { match self.ws_msg_tx.send(doc_data) { - Ok(_) => {}, - Err(e) => log::error!("Propagate ws message data failed. {}", e), + Ok(_) => { + tracing::debug!("Propagate ws message data success") + }, + Err(e) => tracing::error!("Propagate ws message data failed. {}", e), } Ok(()) } @@ -320,7 +323,7 @@ impl ClientDocEditor { pub async fn doc_json(&self) -> DocResult { let (ret, rx) = oneshot::channel::>(); let msg = EditCommand::ReadDoc { ret }; - let _ = self.edit_tx.send(msg); + let _ = self.edit_cmd_tx.send(msg); let s = rx.await.map_err(internal_error)??; Ok(s) } @@ -328,7 +331,7 @@ impl ClientDocEditor { pub async fn doc_delta(&self) -> DocResult { let (ret, rx) = oneshot::channel::>(); let msg = EditCommand::ReadDocDelta { ret }; - let _ = self.edit_tx.send(msg); + let _ = self.edit_cmd_tx.send(msg); let delta = rx.await.map_err(internal_error)??; Ok(delta) } diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache.rs index 4649acbdb4..c45de3f26c 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/cache.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/cache.rs @@ -8,16 +8,7 @@ use flowy_document_infra::entities::doc::Doc; use lib_infra::future::ResultFuture; use lib_ot::{ core::{Operation, OperationTransformable}, - revision::{ - RevId, - RevState, - RevType, - Revision, - RevisionDiskCache, - RevisionMemoryCache, - RevisionRange, - RevisionRecord, - }, + revision::{RevState, RevType, Revision, RevisionDiskCache, RevisionMemoryCache, RevisionRange, RevisionRecord}, rich_text::RichTextDelta, }; use std::{sync::Arc, time::Duration}; @@ -64,13 +55,16 @@ impl RevisionCache { Ok(()) } - #[tracing::instrument(level = "debug", skip(self, rev_id), fields(rev_id = %rev_id.as_ref()))] - pub async fn ack_revision(&self, rev_id: RevId) { - let rev_id = rev_id.value; - self.memory_cache.mut_revision(&rev_id, |mut rev| rev.value_mut().ack()); + #[tracing::instrument(level = "debug", skip(self, rev_id), fields(rev_id = %rev_id))] + pub async fn ack_revision(&self, rev_id: i64) { + self.memory_cache.ack_revision(&rev_id).await; self.save_revisions().await; } + pub async fn query_revision(&self, rev_id: i64) -> Option { + self.memory_cache.query_revision(&rev_id).await + } + async fn save_revisions(&self) { if let Some(handler) = self.defer_save.write().await.take() { handler.abort(); diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs index 5b28ada95c..699046efa1 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs @@ -49,7 +49,7 @@ impl RevisionManager { } pub async fn ack_revision(&self, rev_id: RevId) -> Result<(), DocError> { - self.cache.ack_revision(rev_id).await; + self.cache.ack_revision(rev_id.into()).await; Ok(()) } @@ -89,7 +89,7 @@ impl RevisionManager { } pub(crate) fn make_up_stream(&self, stop_rx: SteamStopRx) -> RevisionUpStream { - RevisionUpStream::new(self.cache.clone(), self.ws_sender.clone(), stop_rx) + RevisionUpStream::new(&self.doc_id, self.cache.clone(), self.ws_sender.clone(), stop_rx) } } diff --git a/frontend/rust-lib/flowy-document/src/services/doc/revision/sync.rs b/frontend/rust-lib/flowy-document/src/services/doc/revision/sync.rs index 315dcc1f8c..68f39d5590 100644 --- a/frontend/rust-lib/flowy-document/src/services/doc/revision/sync.rs +++ b/frontend/rust-lib/flowy-document/src/services/doc/revision/sync.rs @@ -23,7 +23,7 @@ use tokio::{ pub(crate) struct RevisionDownStream { editor: Arc, rev_manager: Arc, - receiver: Option>, + ws_msg_rx: Option>, ws_sender: Arc, stop_rx: Option, } @@ -32,36 +32,36 @@ impl RevisionDownStream { pub(crate) fn new( editor: Arc, rev_manager: Arc, - receiver: mpsc::UnboundedReceiver, + ws_msg_rx: mpsc::UnboundedReceiver, ws_sender: Arc, stop_rx: SteamStopRx, ) -> Self { RevisionDownStream { editor, rev_manager, - receiver: Some(receiver), + ws_msg_rx: Some(ws_msg_rx), ws_sender, stop_rx: Some(stop_rx), } } pub async fn run(mut self) { - let mut receiver = self.receiver.take().expect("Only take once"); + let mut receiver = self.ws_msg_rx.take().expect("Only take once"); let mut stop_rx = self.stop_rx.take().expect("Only take once"); + let doc_id = self.editor.doc_id.clone(); let stream = stream! { loop { - // match receiver.recv().await { - // Some(msg) => yield msg, - // None => break, - // } tokio::select! { result = receiver.recv() => { match result { Some(msg) => yield msg, - None => break, + None => {}, } }, - _ = stop_rx.recv() => break, + _ = stop_rx.recv() => { + tracing::debug!("[RevisionDownStream:{}] loop exit", doc_id); + break + }, }; } }; @@ -80,8 +80,8 @@ impl RevisionDownStream { let bytes = spawn_blocking(move || Bytes::from(data)) .await .map_err(internal_error)?; - log::debug!("[RevisionDownStream]: receives new message: {:?}", ty); + log::debug!("[RevisionDownStream]: receives new message: {:?}", ty); match ty { WsDataType::PushRev => { let _ = self.editor.handle_push_rev(bytes).await?; @@ -115,10 +115,12 @@ pub(crate) struct RevisionUpStream { revisions: Arc, ws_sender: Arc, stop_rx: Option, + doc_id: String, } impl RevisionUpStream { pub(crate) fn new( + doc_id: &str, revisions: Arc, ws_sender: Arc, stop_rx: SteamStopRx, @@ -127,12 +129,14 @@ impl RevisionUpStream { revisions, ws_sender, stop_rx: Some(stop_rx), + doc_id: doc_id.to_owned(), } } pub async fn run(mut self) { let (tx, mut rx) = mpsc::unbounded_channel(); let mut stop_rx = self.stop_rx.take().expect("Only take once"); + let doc_id = self.doc_id.clone(); tokio::spawn(tick(tx)); let stream = stream! { loop { @@ -140,10 +144,13 @@ impl RevisionUpStream { result = rx.recv() => { match result { Some(msg) => yield msg, - None => break, + None => {}, } }, - _ = stop_rx.recv() => break, + _ = stop_rx.recv() => { + tracing::debug!("[RevisionUpStream:{}] loop exit", doc_id); + break + }, }; } }; @@ -167,7 +174,7 @@ impl RevisionUpStream { match self.revisions.next().await? { None => Ok(()), Some(record) => { - log::debug!( + tracing::debug!( "[RevisionUpStream]: processes revision: {}:{:?}", record.revision.doc_id, record.revision.rev_id diff --git a/frontend/rust-lib/flowy-document/tests/editor/revision_test.rs b/frontend/rust-lib/flowy-document/tests/editor/revision_test.rs index 12a4c73945..2a005c7426 100644 --- a/frontend/rust-lib/flowy-document/tests/editor/revision_test.rs +++ b/frontend/rust-lib/flowy-document/tests/editor/revision_test.rs @@ -1,13 +1,39 @@ use flowy_test::editor::{EditorScript::*, *}; +use lib_ot::revision::RevState; #[tokio::test] -async fn create_doc() { +async fn doc_rev_state_test1() { let scripts = vec![ InsertText("123", 0), - AssertRevId(1), - InsertText("456", 3), - AssertRevId(2), - AssertJson(r#"[{"insert":"123456\n"}]"#), + AssertCurrentRevId(1), + AssertRevisionState(1, RevState::Local), + SimulateAckedMessage(1), + AssertRevisionState(1, RevState::Acked), + AssertNextSendingRevision(None), + AssertJson(r#"[{"insert":"123\n"}]"#), + ]; + EditorTest::new().await.run_scripts(scripts).await; +} + +#[tokio::test] +async fn doc_rev_state_test2() { + let scripts = vec![ + InsertText("1", 0), + InsertText("2", 1), + InsertText("3", 2), + AssertCurrentRevId(3), + AssertRevisionState(1, RevState::Local), + AssertRevisionState(2, RevState::Local), + AssertRevisionState(3, RevState::Local), + SimulateAckedMessage(1), + AssertRevisionState(1, RevState::Acked), + AssertNextSendingRevision(Some(2)), + SimulateAckedMessage(2), + AssertRevisionState(2, RevState::Acked), + // + AssertNextSendingRevision(Some(3)), + AssertRevisionState(3, RevState::Local), + AssertJson(r#"[{"insert":"123\n"}]"#), ]; EditorTest::new().await.run_scripts(scripts).await; } diff --git a/frontend/rust-lib/flowy-test/src/editor.rs b/frontend/rust-lib/flowy-test/src/editor.rs index 45eaf35a58..414ae555d1 100644 --- a/frontend/rust-lib/flowy-test/src/editor.rs +++ b/frontend/rust-lib/flowy-test/src/editor.rs @@ -1,7 +1,11 @@ use crate::{helper::ViewTest, FlowySDKTest}; -use flowy_document::services::doc::edit::ClientDocEditor; -use flowy_document_infra::entities::doc::DocIdentifier; -use lib_ot::{core::Interval, revision::RevState, rich_text::RichTextDelta}; +use flowy_document::services::doc::{edit::ClientDocEditor, revision::RevisionIterator}; +use flowy_document_infra::entities::{doc::DocIdentifier, ws::WsDocumentDataBuilder}; +use lib_ot::{ + core::Interval, + revision::{RevState, Revision, RevisionRange}, + rich_text::RichTextDelta, +}; use std::sync::Arc; use tokio::time::{sleep, Duration}; @@ -25,14 +29,15 @@ impl EditorTest { self.run_script(script).await; } - sleep(Duration::from_secs(10)).await; + sleep(Duration::from_secs(5)).await; } async fn run_script(&mut self, script: EditorScript) { let rev_manager = self.editor.rev_manager(); let cache = rev_manager.revision_cache(); - let memory_cache = cache.memory_cache(); - let disk_cache = cache.dish_cache(); + let _memory_cache = cache.memory_cache(); + let _disk_cache = cache.dish_cache(); + let doc_id = self.editor.doc_id.clone(); match script { EditorScript::InsertText(s, offset) => { @@ -50,11 +55,29 @@ impl EditorTest { EditorScript::Redo() => { self.editor.redo().await.unwrap(); }, - EditorScript::AssertRevisionState(rev_id, state) => {}, - EditorScript::AssertNextSentRevision(rev_id, state) => {}, - EditorScript::AssertRevId(rev_id) => { + EditorScript::AssertRevisionState(rev_id, state) => { + let record = cache.query_revision(rev_id).await.unwrap(); + assert_eq!(record.state, state); + }, + EditorScript::AssertCurrentRevId(rev_id) => { assert_eq!(self.editor.rev_manager().rev_id(), rev_id); }, + EditorScript::AssertNextSendingRevision(rev_id) => { + let next_revision = cache.next().await.unwrap(); + if rev_id.is_none() { + assert_eq!(next_revision.is_none(), true); + } + + let next_revision = next_revision.unwrap(); + assert_eq!(next_revision.revision.rev_id, rev_id.unwrap()); + }, + EditorScript::SimulatePushRevisionMessage(_revision) => {}, + EditorScript::SimulatePullRevisionMessage(_range) => {}, + EditorScript::SimulateAckedMessage(i64) => { + let data = WsDocumentDataBuilder::build_acked_message(&doc_id, i64); + self.editor.handle_ws_message(data).await.unwrap(); + sleep(Duration::from_millis(200)).await; + }, EditorScript::AssertJson(expected) => { let expected_delta: RichTextDelta = serde_json::from_str(expected).unwrap(); let delta = self.editor.doc_delta().await.unwrap(); @@ -75,8 +98,11 @@ pub enum EditorScript { Replace(Interval, &'static str), Undo(), Redo(), + SimulatePushRevisionMessage(Revision), + SimulatePullRevisionMessage(RevisionRange), + SimulateAckedMessage(i64), AssertRevisionState(i64, RevState), - AssertNextSentRevision(i64, RevState), - AssertRevId(i64), + AssertNextSendingRevision(Option), + AssertCurrentRevId(i64), AssertJson(&'static str), } diff --git a/shared-lib/flowy-document-infra/src/entities/ws/ws.rs b/shared-lib/flowy-document-infra/src/entities/ws/ws.rs index 422f6591b3..6b3555ce16 100644 --- a/shared-lib/flowy-document-infra/src/entities/ws/ws.rs +++ b/shared-lib/flowy-document-infra/src/entities/ws/ws.rs @@ -1,7 +1,7 @@ use crate::{entities::doc::NewDocUser, errors::DocumentError}; use bytes::Bytes; use flowy_derive::{ProtoBuf, ProtoBuf_Enum}; -use lib_ot::revision::Revision; +use lib_ot::revision::{RevId, Revision, RevisionRange}; use std::convert::{TryFrom, TryInto}; #[derive(Debug, Clone, ProtoBuf_Enum, Eq, PartialEq, Hash)] @@ -11,7 +11,7 @@ pub enum WsDataType { // The frontend receives the PushRev event means the backend is pushing the new revision to frontend PushRev = 1, // The fronted receives the PullRev event means the backend try to pull the revision from frontend - PullRev = 2, // data should be Revision + PullRev = 2, Conflict = 3, NewDocUser = 4, } @@ -37,7 +37,6 @@ pub struct WsDocumentData { #[pb(index = 2)] pub ty: WsDataType, - // Opti: parse the data with type constraints #[pb(index = 3)] pub data: Vec, } @@ -65,3 +64,37 @@ impl std::convert::From for WsDocumentData { } } } + +pub struct WsDocumentDataBuilder(); +impl WsDocumentDataBuilder { + // WsDataType::PushRev -> Revision + pub fn build_push_rev_message(doc_id: &str, revision: Revision) -> WsDocumentData { + let bytes: Bytes = revision.try_into().unwrap(); + WsDocumentData { + doc_id: doc_id.to_string(), + ty: WsDataType::PushRev, + data: bytes.to_vec(), + } + } + + // WsDataType::PullRev -> RevisionRange + pub fn build_push_pull_message(doc_id: &str, range: RevisionRange) -> WsDocumentData { + let bytes: Bytes = range.try_into().unwrap(); + WsDocumentData { + doc_id: doc_id.to_string(), + ty: WsDataType::PullRev, + data: bytes.to_vec(), + } + } + + // WsDataType::Acked -> RevId + pub fn build_acked_message(doc_id: &str, rev_id: i64) -> WsDocumentData { + let rev_id: RevId = rev_id.into(); + let bytes: Bytes = rev_id.try_into().unwrap(); + WsDocumentData { + doc_id: doc_id.to_string(), + ty: WsDataType::Acked, + data: bytes.to_vec(), + } + } +} diff --git a/shared-lib/lib-ot/src/revision/cache.rs b/shared-lib/lib-ot/src/revision/cache.rs index 4013ca7337..4e2dd40673 100644 --- a/shared-lib/lib-ot/src/revision/cache.rs +++ b/shared-lib/lib-ot/src/revision/cache.rs @@ -2,7 +2,7 @@ use crate::{ errors::OTError, revision::{Revision, RevisionRange}, }; -use dashmap::{mapref::one::RefMut, DashMap}; +use dashmap::DashMap; use std::{collections::VecDeque, fmt::Debug, sync::Arc}; use tokio::sync::{broadcast, RwLock}; @@ -48,12 +48,15 @@ impl RevisionMemoryCache { pub fn remove_revisions(&self, ids: Vec) { self.revs_map.retain(|k, _| !ids.contains(k)); } - pub fn mut_revision(&self, rev_id: &i64, f: F) - where - F: Fn(RefMut), - { - if let Some(m_revision) = self.revs_map.get_mut(rev_id) { - f(m_revision) + pub async fn ack_revision(&self, rev_id: &i64) { + if let Some(mut m_revision) = self.revs_map.get_mut(rev_id) { + m_revision.value_mut().ack(); + match self.pending_revs.write().await.pop_front() { + None => log::error!("The pending_revs should not be empty"), + Some(cache_rev_id) => { + assert_eq!(&cache_rev_id, rev_id); + }, + } } else { log::error!("Can't find revision with id {}", rev_id); } @@ -90,6 +93,10 @@ impl RevisionMemoryCache { (ids, records) } + pub async fn query_revision(&self, rev_id: &i64) -> Option { + self.revs_map.get(&rev_id).map(|r| r.value().clone()) + } + pub async fn front_revision(&self) -> Option<(i64, RevisionRecord)> { match self.pending_revs.read().await.front() { None => None, @@ -103,7 +110,7 @@ impl RevisionMemoryCache { pub type RevIdReceiver = broadcast::Receiver; pub type RevIdSender = broadcast::Sender; -#[derive(Clone, Eq, PartialEq)] +#[derive(Debug, Clone, Eq, PartialEq)] pub enum RevState { Local = 0, Acked = 1,