chore: remove feature flag: filter

This commit is contained in:
appflowy 2022-07-20 18:27:12 +08:00
parent 0f4f51cc0c
commit e0db7bd4f9
13 changed files with 196 additions and 29 deletions

View File

@ -11,9 +11,9 @@ use flowy_database::ConnectionPool;
use flowy_error::{FlowyError, FlowyResult};
use flowy_folder_data_model::revision::{AppRevision, TrashRevision, ViewRevision, WorkspaceRevision};
use flowy_revision::disk::{RevisionRecord, RevisionState};
use flowy_revision::mk_revision_disk_cache;
use flowy_sync::client_folder::initial_folder_delta;
use flowy_revision::mk_text_block_revision_disk_cache;
use flowy_sync::{client_folder::FolderPad, entities::revision::Revision};
use lib_ot::core::PlainTextDeltaBuilder;
use std::sync::Arc;
use tokio::sync::RwLock;
pub use version_1::{app_sql::*, trash_sql::*, v1_impl::V1Transaction, view_sql::*, workspace_sql::*};
@ -109,16 +109,16 @@ impl FolderPersistence {
pub async fn save_folder(&self, user_id: &str, folder_id: &FolderId, folder: FolderPad) -> FlowyResult<()> {
let pool = self.database.db_pool()?;
let delta_data = initial_folder_delta(&folder)?.to_delta_bytes();
let md5 = folder.md5();
let revision = Revision::new(folder_id.as_ref(), 0, 0, delta_data, user_id, md5);
let json = folder.to_json()?;
let delta_data = PlainTextDeltaBuilder::new().insert(&json).build().to_delta_bytes();
let revision = Revision::initial_revision(user_id, folder_id.as_ref(), delta_data);
let record = RevisionRecord {
revision,
state: RevisionState::Sync,
write_to_disk: true,
};
let disk_cache = mk_revision_disk_cache(user_id, pool);
let disk_cache = mk_text_block_revision_disk_cache(user_id, pool);
disk_cache.delete_and_insert_records(folder_id.as_ref(), None, vec![record])
}
}

View File

@ -50,7 +50,6 @@ lib-infra = { path = "../../../shared-lib/lib-infra", features = ["protobuf_file
[features]
default = ["filter"]
default = []
dart = ["lib-infra/dart"]
filter = []
flowy_unit_test = ["flowy-revision/flowy_unit_test"]

View File

@ -2,6 +2,7 @@ use crate::services::block_revision_editor::GridBlockRevisionCompactor;
use crate::services::grid_editor::{GridRevisionCompactor, GridRevisionEditor};
use crate::services::persistence::block_index::BlockIndexCache;
use crate::services::persistence::kv::GridKVPersistence;
use crate::services::persistence::migration::GridMigration;
use crate::services::persistence::GridDatabase;
use crate::services::tasks::GridTaskScheduler;
use bytes::Bytes;
@ -31,6 +32,7 @@ pub struct GridManager {
#[allow(dead_code)]
kv_persistence: Arc<GridKVPersistence>,
task_scheduler: GridTaskSchedulerRwLock,
migration: GridMigration,
}
impl GridManager {
@ -41,17 +43,27 @@ impl GridManager {
) -> Self {
let grid_editors = Arc::new(DashMap::new());
let kv_persistence = Arc::new(GridKVPersistence::new(database.clone()));
let block_index_cache = Arc::new(BlockIndexCache::new(database));
let block_index_cache = Arc::new(BlockIndexCache::new(database.clone()));
let task_scheduler = GridTaskScheduler::new();
let migration = GridMigration::new(grid_user.clone(), database);
Self {
grid_editors,
grid_user,
kv_persistence,
block_index_cache,
task_scheduler,
migration,
}
}
pub async fn initialize_with_new_user(&self, _user_id: &str, _token: &str) -> FlowyResult<()> {
Ok(())
}
pub async fn initialize(&self, _user_id: &str, _token: &str) -> FlowyResult<()> {
Ok(())
}
#[tracing::instrument(level = "debug", skip_all, err)]
pub async fn create_grid<T: AsRef<str>>(&self, grid_id: T, revisions: RepeatedRevision) -> FlowyResult<()> {
let grid_id = grid_id.as_ref();
@ -74,6 +86,7 @@ impl GridManager {
pub async fn open_grid<T: AsRef<str>>(&self, grid_id: T) -> FlowyResult<Arc<GridRevisionEditor>> {
let grid_id = grid_id.as_ref();
tracing::Span::current().record("grid_id", &grid_id);
let _ = self.migration.migration_grid_if_need(grid_id).await;
self.get_or_create_grid_editor(grid_id).await
}

View File

@ -643,8 +643,8 @@ pub struct GridPadBuilder();
impl RevisionObjectBuilder for GridPadBuilder {
type Output = GridRevisionPad;
fn build_object(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
let pad = GridRevisionPad::from_revisions(object_id, revisions)?;
fn build_object(_object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
let pad = GridRevisionPad::from_revisions(revisions)?;
Ok(pad)
}
}

View File

@ -7,6 +7,7 @@ use flowy_database::{
use flowy_error::FlowyResult;
use std::sync::Arc;
/// Allow getting the block id from row id.
pub struct BlockIndexCache {
database: Arc<dyn GridDatabase>,
}

View File

@ -0,0 +1,113 @@
use crate::manager::GridUser;
use crate::services::persistence::GridDatabase;
use flowy_database::kv::KV;
use flowy_error::FlowyResult;
use flowy_grid_data_model::revision::GridRevision;
use flowy_revision::disk::{RevisionRecord, SQLiteGridRevisionPersistence};
use flowy_revision::{mk_grid_block_revision_disk_cache, RevisionLoader, RevisionPersistence};
use flowy_sync::client_grid::{make_grid_rev_json_str, GridRevisionPad};
use flowy_sync::entities::revision::Revision;
use lib_ot::core::PlainTextDeltaBuilder;
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use std::sync::Arc;
pub(crate) struct GridMigration {
user: Arc<dyn GridUser>,
database: Arc<dyn GridDatabase>,
}
impl GridMigration {
pub fn new(user: Arc<dyn GridUser>, database: Arc<dyn GridDatabase>) -> Self {
Self { user, database }
}
pub async fn migration_grid_if_need(&self, grid_id: &str) -> FlowyResult<()> {
match KV::get_str(grid_id) {
None => {
let _ = self.reset_grid_rev(grid_id).await?;
let _ = self.save_migrate_record(grid_id)?;
}
Some(s) => {
let mut record = MigrationGridRecord::from_str(&s)?;
let empty_json = self.empty_grid_rev_json()?;
if record.len < empty_json.len() {
let _ = self.reset_grid_rev(grid_id).await?;
record.len = empty_json.len();
KV::set_str(grid_id, record.to_string());
}
}
}
Ok(())
}
async fn reset_grid_rev(&self, grid_id: &str) -> FlowyResult<()> {
let user_id = self.user.user_id()?;
let pool = self.database.db_pool()?;
let grid_rev_pad = self.get_grid_revision_pad(grid_id).await?;
let json = grid_rev_pad.json_str()?;
let delta_data = PlainTextDeltaBuilder::new().insert(&json).build().to_delta_bytes();
let revision = Revision::initial_revision(&user_id, grid_id, delta_data);
let record = RevisionRecord::new(revision);
//
let disk_cache = mk_grid_block_revision_disk_cache(&user_id, pool);
let _ = disk_cache.delete_and_insert_records(grid_id, None, vec![record]);
Ok(())
}
fn save_migrate_record(&self, grid_id: &str) -> FlowyResult<()> {
let empty_json_str = self.empty_grid_rev_json()?;
let record = MigrationGridRecord {
grid_id: grid_id.to_owned(),
len: empty_json_str.len(),
};
KV::set_str(grid_id, record.to_string());
Ok(())
}
fn empty_grid_rev_json(&self) -> FlowyResult<String> {
let empty_grid_rev = GridRevision::default();
let empty_json = make_grid_rev_json_str(&empty_grid_rev)?;
Ok(empty_json)
}
async fn get_grid_revision_pad(&self, grid_id: &str) -> FlowyResult<GridRevisionPad> {
let pool = self.database.db_pool()?;
let user_id = self.user.user_id()?;
let disk_cache = SQLiteGridRevisionPersistence::new(&user_id, pool);
let rev_persistence = Arc::new(RevisionPersistence::new(&user_id, grid_id, disk_cache));
let (revisions, _) = RevisionLoader {
object_id: grid_id.to_owned(),
user_id,
cloud: None,
rev_persistence,
}
.load()
.await?;
let pad = GridRevisionPad::from_revisions(revisions)?;
Ok(pad)
}
}
#[derive(Serialize, Deserialize)]
struct MigrationGridRecord {
grid_id: String,
len: usize,
}
impl FromStr for MigrationGridRecord {
type Err = serde_json::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
serde_json::from_str::<MigrationGridRecord>(s)
}
}
impl ToString for MigrationGridRecord {
fn to_string(&self) -> String {
serde_json::to_string(self).unwrap_or_else(|_| "".to_string())
}
}

View File

@ -4,6 +4,7 @@ use std::sync::Arc;
pub mod block_index;
pub mod kv;
pub mod migration;
pub trait GridDatabase: Send + Sync {
fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError>;

View File

@ -53,6 +53,14 @@ pub struct RevisionRecord {
}
impl RevisionRecord {
pub fn new(revision: Revision) -> Self {
Self {
revision,
state: RevisionState::Sync,
write_to_disk: true,
}
}
pub fn ack(&mut self) {
self.state = RevisionState::Ack;
}
@ -64,6 +72,8 @@ pub struct RevisionChangeset {
pub(crate) state: RevisionState,
}
/// Sync: revision is not synced to the server
/// Ack: revision is synced to the server
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum RevisionState {
Sync = 0,

View File

@ -2,7 +2,7 @@ use crate::cache::{
disk::{RevisionChangeset, RevisionDiskCache, SQLiteTextBlockRevisionPersistence},
memory::RevisionMemoryCacheDelegate,
};
use crate::disk::{RevisionRecord, RevisionState};
use crate::disk::{RevisionRecord, RevisionState, SQLiteGridBlockRevisionPersistence};
use crate::memory::RevisionMemoryCache;
use crate::RevisionCompactor;
use flowy_database::ConnectionPool;
@ -214,13 +214,20 @@ impl RevisionPersistence {
}
}
pub fn mk_revision_disk_cache(
pub fn mk_text_block_revision_disk_cache(
user_id: &str,
pool: Arc<ConnectionPool>,
) -> Arc<dyn RevisionDiskCache<Error = FlowyError>> {
Arc::new(SQLiteTextBlockRevisionPersistence::new(user_id, pool))
}
pub fn mk_grid_block_revision_disk_cache(
user_id: &str,
pool: Arc<ConnectionPool>,
) -> Arc<dyn RevisionDiskCache<Error = FlowyError>> {
Arc::new(SQLiteGridBlockRevisionPersistence::new(user_id, pool))
}
impl RevisionMemoryCacheDelegate for Arc<dyn RevisionDiskCache<Error = FlowyError>> {
fn checkpoint_tick(&self, mut records: Vec<RevisionRecord>) -> FlowyResult<()> {
records.retain(|record| record.write_to_disk);

View File

@ -16,14 +16,23 @@ use std::sync::Arc;
pub struct GridDepsResolver();
impl GridDepsResolver {
pub fn resolve(ws_conn: Arc<FlowyWebSocketConnect>, user_session: Arc<UserSession>) -> Arc<GridManager> {
pub async fn resolve(ws_conn: Arc<FlowyWebSocketConnect>, user_session: Arc<UserSession>) -> Arc<GridManager> {
let user = Arc::new(GridUserImpl(user_session.clone()));
let rev_web_socket = Arc::new(GridWebSocket(ws_conn));
Arc::new(GridManager::new(
user,
let grid_manager = Arc::new(GridManager::new(
user.clone(),
rev_web_socket,
Arc::new(GridDatabaseImpl(user_session)),
))
));
if let (Ok(user_id), Ok(token)) = (user.user_id(), user.token()) {
match grid_manager.initialize(&user_id, &token).await {
Ok(_) => {}
Err(e) => tracing::error!("Initialize grid manager failed: {}", e),
}
}
grid_manager
}
}

View File

@ -112,7 +112,7 @@ impl FlowySDK {
&config.server_config,
);
let grid_manager = GridDepsResolver::resolve(ws_conn.clone(), user_session.clone());
let grid_manager = GridDepsResolver::resolve(ws_conn.clone(), user_session.clone()).await;
let folder_manager = FolderDepsResolver::resolve(
local_server.clone(),
@ -147,7 +147,7 @@ impl FlowySDK {
)
}));
_start_listening(&dispatcher, &ws_conn, &user_session, &folder_manager);
_start_listening(&dispatcher, &ws_conn, &user_session, &folder_manager, &grid_manager);
Self {
config,
@ -171,10 +171,12 @@ fn _start_listening(
ws_conn: &Arc<FlowyWebSocketConnect>,
user_session: &Arc<UserSession>,
folder_manager: &Arc<FolderManager>,
grid_manager: &Arc<GridManager>,
) {
let subscribe_user_status = user_session.notifier.subscribe_user_status();
let subscribe_network_type = ws_conn.subscribe_network_ty();
let folder_manager = folder_manager.clone();
let grid_manager = grid_manager.clone();
let cloned_folder_manager = folder_manager.clone();
let ws_conn = ws_conn.clone();
let user_session = user_session.clone();
@ -182,7 +184,13 @@ fn _start_listening(
dispatch.spawn(async move {
user_session.init();
listen_on_websocket(ws_conn.clone());
_listen_user_status(ws_conn.clone(), subscribe_user_status, folder_manager.clone()).await;
_listen_user_status(
ws_conn.clone(),
subscribe_user_status,
folder_manager.clone(),
grid_manager.clone(),
)
.await;
});
dispatch.spawn(async move {
@ -209,6 +217,7 @@ async fn _listen_user_status(
ws_conn: Arc<FlowyWebSocketConnect>,
mut subscribe: broadcast::Receiver<UserStatus>,
folder_manager: Arc<FolderManager>,
grid_manager: Arc<GridManager>,
) {
while let Ok(status) = subscribe.recv().await {
let result = || async {
@ -216,6 +225,7 @@ async fn _listen_user_status(
UserStatus::Login { token, user_id } => {
tracing::trace!("User did login");
let _ = folder_manager.initialize(&user_id, &token).await?;
let _ = grid_manager.initialize(&user_id, &token).await?;
let _ = ws_conn.start(token, user_id).await?;
}
UserStatus::Logout { .. } => {
@ -233,6 +243,11 @@ async fn _listen_user_status(
let _ = folder_manager
.initialize_with_new_user(&profile.id, &profile.token)
.await?;
let _ = grid_manager
.initialize_with_new_user(&profile.id, &profile.token)
.await?;
let _ = ws_conn.start(profile.token.clone(), profile.id.clone()).await?;
let _ = ret.send(());
}

View File

@ -31,13 +31,8 @@ pub struct GridRevision {
pub fields: Vec<Arc<FieldRevision>>,
pub blocks: Vec<Arc<GridBlockMetaRevision>>,
#[cfg(feature = "filter")]
#[serde(default)]
pub setting: GridSettingRevision,
#[cfg(not(feature = "filter"))]
#[serde(default, skip)]
pub setting: GridSettingRevision,
}
impl GridRevision {

View File

@ -62,7 +62,7 @@ impl GridRevisionPad {
})
}
pub fn from_revisions(_grid_id: &str, revisions: Vec<Revision>) -> CollaborateResult<Self> {
pub fn from_revisions(revisions: Vec<Revision>) -> CollaborateResult<Self> {
let grid_delta: GridRevisionDelta = make_delta_from_revisions::<PlainTextAttributes>(revisions)?;
Self::from_delta(grid_delta)
}
@ -480,8 +480,8 @@ impl GridRevisionPad {
match f(Arc::make_mut(&mut self.grid_rev))? {
None => Ok(None),
Some(_) => {
let old = json_from_grid(&cloned_grid)?;
let new = json_from_grid(&self.grid_rev)?;
let old = make_grid_rev_json_str(&cloned_grid)?;
let new = self.json_str()?;
match cal_diff::<PlainTextAttributes>(old, new) {
None => Ok(None),
Some(delta) => {
@ -528,9 +528,13 @@ impl GridRevisionPad {
},
)
}
pub fn json_str(&self) -> CollaborateResult<String> {
make_grid_rev_json_str(&self.grid_rev)
}
}
fn json_from_grid(grid: &Arc<GridRevision>) -> CollaborateResult<String> {
pub fn make_grid_rev_json_str(grid: &GridRevision) -> CollaborateResult<String> {
let json = serde_json::to_string(grid)
.map_err(|err| internal_error(format!("Serialize grid to json str failed. {:?}", err)))?;
Ok(json)