chore: add ref count map

This commit is contained in:
nathan 2022-11-08 09:30:10 +08:00
parent de4c1b24ef
commit ebdd28cf1c
6 changed files with 136 additions and 50 deletions

View File

@ -42,7 +42,7 @@ pub(crate) async fn update_grid_setting_handler(
) -> Result<(), FlowyError> {
let params: GridSettingChangesetParams = data.into_inner().try_into()?;
let editor = manager.get_grid_editor(&params.grid_id)?;
let editor = manager.get_grid_editor(&params.grid_id).await?;
if let Some(insert_params) = params.insert_group {
let _ = editor.insert_group(insert_params).await?;
}
@ -67,7 +67,7 @@ pub(crate) async fn get_grid_blocks_handler(
manager: AppData<Arc<GridManager>>,
) -> DataResult<RepeatedBlockPB, FlowyError> {
let params: QueryGridBlocksParams = data.into_inner().try_into()?;
let editor = manager.get_grid_editor(&params.grid_id)?;
let editor = manager.get_grid_editor(&params.grid_id).await?;
let repeated_grid_block = editor.get_blocks(Some(params.block_ids)).await?;
data_result(repeated_grid_block)
}
@ -78,7 +78,7 @@ pub(crate) async fn get_fields_handler(
manager: AppData<Arc<GridManager>>,
) -> DataResult<RepeatedFieldPB, FlowyError> {
let params: QueryFieldParams = data.into_inner().try_into()?;
let editor = manager.get_grid_editor(&params.grid_id)?;
let editor = manager.get_grid_editor(&params.grid_id).await?;
let field_orders = params
.field_ids
.items
@ -96,7 +96,7 @@ pub(crate) async fn update_field_handler(
manager: AppData<Arc<GridManager>>,
) -> Result<(), FlowyError> {
let changeset: FieldChangesetParams = data.into_inner().try_into()?;
let editor = manager.get_grid_editor(&changeset.grid_id)?;
let editor = manager.get_grid_editor(&changeset.grid_id).await?;
let _ = editor.update_field(changeset).await?;
Ok(())
}
@ -107,7 +107,7 @@ pub(crate) async fn update_field_type_option_handler(
manager: AppData<Arc<GridManager>>,
) -> Result<(), FlowyError> {
let params: UpdateFieldTypeOptionParams = data.into_inner().try_into()?;
let editor = manager.get_grid_editor(&params.grid_id)?;
let editor = manager.get_grid_editor(&params.grid_id).await?;
let _ = editor
.update_field_type_option(&params.grid_id, &params.field_id, params.type_option_data)
.await?;
@ -120,7 +120,7 @@ pub(crate) async fn delete_field_handler(
manager: AppData<Arc<GridManager>>,
) -> Result<(), FlowyError> {
let params: FieldIdParams = data.into_inner().try_into()?;
let editor = manager.get_grid_editor(&params.grid_id)?;
let editor = manager.get_grid_editor(&params.grid_id).await?;
let _ = editor.delete_field(&params.field_id).await?;
Ok(())
}
@ -131,7 +131,7 @@ pub(crate) async fn switch_to_field_handler(
manager: AppData<Arc<GridManager>>,
) -> Result<(), FlowyError> {
let params: EditFieldParams = data.into_inner().try_into()?;
let editor = manager.get_grid_editor(&params.grid_id)?;
let editor = manager.get_grid_editor(&params.grid_id).await?;
editor
.switch_to_field_type(&params.field_id, &params.field_type)
.await?;
@ -157,7 +157,7 @@ pub(crate) async fn duplicate_field_handler(
manager: AppData<Arc<GridManager>>,
) -> Result<(), FlowyError> {
let params: FieldIdParams = data.into_inner().try_into()?;
let editor = manager.get_grid_editor(&params.grid_id)?;
let editor = manager.get_grid_editor(&params.grid_id).await?;
let _ = editor.duplicate_field(&params.field_id).await?;
Ok(())
}
@ -169,7 +169,7 @@ pub(crate) async fn get_field_type_option_data_handler(
manager: AppData<Arc<GridManager>>,
) -> DataResult<FieldTypeOptionDataPB, FlowyError> {
let params: FieldTypeOptionIdParams = data.into_inner().try_into()?;
let editor = manager.get_grid_editor(&params.grid_id)?;
let editor = manager.get_grid_editor(&params.grid_id).await?;
match editor.get_field_rev(&params.field_id).await {
None => Err(FlowyError::record_not_found()),
Some(field_rev) => {
@ -192,7 +192,7 @@ pub(crate) async fn create_field_type_option_data_handler(
manager: AppData<Arc<GridManager>>,
) -> DataResult<FieldTypeOptionDataPB, FlowyError> {
let params: CreateFieldParams = data.into_inner().try_into()?;
let editor = manager.get_grid_editor(&params.grid_id)?;
let editor = manager.get_grid_editor(&params.grid_id).await?;
let field_rev = editor
.create_new_field_rev(&params.field_type, params.type_option_data)
.await?;
@ -212,7 +212,7 @@ pub(crate) async fn move_field_handler(
manager: AppData<Arc<GridManager>>,
) -> Result<(), FlowyError> {
let params: MoveFieldParams = data.into_inner().try_into()?;
let editor = manager.get_grid_editor(&params.grid_id)?;
let editor = manager.get_grid_editor(&params.grid_id).await?;
let _ = editor.move_field(params).await?;
Ok(())
}
@ -237,7 +237,7 @@ pub(crate) async fn get_row_handler(
manager: AppData<Arc<GridManager>>,
) -> DataResult<OptionalRowPB, FlowyError> {
let params: RowIdParams = data.into_inner().try_into()?;
let editor = manager.get_grid_editor(&params.grid_id)?;
let editor = manager.get_grid_editor(&params.grid_id).await?;
let row = editor.get_row_rev(&params.row_id).await?.map(make_row_from_row_rev);
data_result(OptionalRowPB { row })
@ -249,7 +249,7 @@ pub(crate) async fn delete_row_handler(
manager: AppData<Arc<GridManager>>,
) -> Result<(), FlowyError> {
let params: RowIdParams = data.into_inner().try_into()?;
let editor = manager.get_grid_editor(&params.grid_id)?;
let editor = manager.get_grid_editor(&params.grid_id).await?;
let _ = editor.delete_row(&params.row_id).await?;
Ok(())
}
@ -260,7 +260,7 @@ pub(crate) async fn duplicate_row_handler(
manager: AppData<Arc<GridManager>>,
) -> Result<(), FlowyError> {
let params: RowIdParams = data.into_inner().try_into()?;
let editor = manager.get_grid_editor(&params.grid_id)?;
let editor = manager.get_grid_editor(&params.grid_id).await?;
let _ = editor.duplicate_row(&params.row_id).await?;
Ok(())
}
@ -271,7 +271,7 @@ pub(crate) async fn move_row_handler(
manager: AppData<Arc<GridManager>>,
) -> Result<(), FlowyError> {
let params: MoveRowParams = data.into_inner().try_into()?;
let editor = manager.get_grid_editor(&params.view_id)?;
let editor = manager.get_grid_editor(&params.view_id).await?;
let _ = editor.move_row(params).await?;
Ok(())
}
@ -282,7 +282,7 @@ pub(crate) async fn create_table_row_handler(
manager: AppData<Arc<GridManager>>,
) -> DataResult<RowPB, FlowyError> {
let params: CreateRowParams = data.into_inner().try_into()?;
let editor = manager.get_grid_editor(params.grid_id.as_ref())?;
let editor = manager.get_grid_editor(params.grid_id.as_ref()).await?;
let row = editor.create_row(params).await?;
data_result(row)
}
@ -293,7 +293,7 @@ pub(crate) async fn get_cell_handler(
manager: AppData<Arc<GridManager>>,
) -> DataResult<GridCellPB, FlowyError> {
let params: GridCellIdParams = data.into_inner().try_into()?;
let editor = manager.get_grid_editor(&params.grid_id)?;
let editor = manager.get_grid_editor(&params.grid_id).await?;
match editor.get_cell(&params).await {
None => data_result(GridCellPB::empty(&params.field_id)),
Some(cell) => data_result(cell),
@ -306,7 +306,7 @@ pub(crate) async fn update_cell_handler(
manager: AppData<Arc<GridManager>>,
) -> Result<(), FlowyError> {
let changeset: CellChangesetPB = data.into_inner();
let editor = manager.get_grid_editor(&changeset.grid_id)?;
let editor = manager.get_grid_editor(&changeset.grid_id).await?;
let _ = editor.update_cell(changeset).await?;
Ok(())
}
@ -317,7 +317,7 @@ pub(crate) async fn new_select_option_handler(
manager: AppData<Arc<GridManager>>,
) -> DataResult<SelectOptionPB, FlowyError> {
let params: CreateSelectOptionParams = data.into_inner().try_into()?;
let editor = manager.get_grid_editor(&params.grid_id)?;
let editor = manager.get_grid_editor(&params.grid_id).await?;
match editor.get_field_rev(&params.field_id).await {
None => Err(ErrorCode::InvalidData.into()),
Some(field_rev) => {
@ -334,7 +334,7 @@ pub(crate) async fn update_select_option_handler(
manager: AppData<Arc<GridManager>>,
) -> Result<(), FlowyError> {
let changeset: SelectOptionChangeset = data.into_inner().try_into()?;
let editor = manager.get_grid_editor(&changeset.cell_identifier.grid_id)?;
let editor = manager.get_grid_editor(&changeset.cell_identifier.grid_id).await?;
let _ = editor
.modify_field_rev(&changeset.cell_identifier.field_id, |field_rev| {
@ -391,7 +391,7 @@ pub(crate) async fn get_select_option_handler(
manager: AppData<Arc<GridManager>>,
) -> DataResult<SelectOptionCellDataPB, FlowyError> {
let params: GridCellIdParams = data.into_inner().try_into()?;
let editor = manager.get_grid_editor(&params.grid_id)?;
let editor = manager.get_grid_editor(&params.grid_id).await?;
match editor.get_field_rev(&params.field_id).await {
None => {
tracing::error!("Can't find the select option field with id: {}", params.field_id);
@ -420,7 +420,7 @@ pub(crate) async fn update_select_option_cell_handler(
manager: AppData<Arc<GridManager>>,
) -> Result<(), FlowyError> {
let params: SelectOptionCellChangesetParams = data.into_inner().try_into()?;
let editor = manager.get_grid_editor(&params.cell_identifier.grid_id)?;
let editor = manager.get_grid_editor(&params.cell_identifier.grid_id).await?;
let _ = editor.update_cell(params.into()).await?;
Ok(())
}
@ -431,7 +431,7 @@ pub(crate) async fn update_date_cell_handler(
manager: AppData<Arc<GridManager>>,
) -> Result<(), FlowyError> {
let params: DateChangesetParams = data.into_inner().try_into()?;
let editor = manager.get_grid_editor(&params.cell_identifier.grid_id)?;
let editor = manager.get_grid_editor(&params.cell_identifier.grid_id).await?;
let _ = editor.update_cell(params.into()).await?;
Ok(())
}
@ -442,7 +442,7 @@ pub(crate) async fn get_groups_handler(
manager: AppData<Arc<GridManager>>,
) -> DataResult<RepeatedGridGroupPB, FlowyError> {
let params: GridIdPB = data.into_inner();
let editor = manager.get_grid_editor(&params.value)?;
let editor = manager.get_grid_editor(&params.value).await?;
let group = editor.load_groups().await?;
data_result(group)
}
@ -453,7 +453,7 @@ pub(crate) async fn create_board_card_handler(
manager: AppData<Arc<GridManager>>,
) -> DataResult<RowPB, FlowyError> {
let params: CreateRowParams = data.into_inner().try_into()?;
let editor = manager.get_grid_editor(params.grid_id.as_ref())?;
let editor = manager.get_grid_editor(params.grid_id.as_ref()).await?;
let row = editor.create_row(params).await?;
data_result(row)
}
@ -464,7 +464,7 @@ pub(crate) async fn move_group_handler(
manager: AppData<Arc<GridManager>>,
) -> FlowyResult<()> {
let params: MoveGroupParams = data.into_inner().try_into()?;
let editor = manager.get_grid_editor(params.view_id.as_ref())?;
let editor = manager.get_grid_editor(params.view_id.as_ref()).await?;
let _ = editor.move_group(params).await?;
Ok(())
}
@ -475,7 +475,7 @@ pub(crate) async fn move_group_row_handler(
manager: AppData<Arc<GridManager>>,
) -> FlowyResult<()> {
let params: MoveGroupRowParams = data.into_inner().try_into()?;
let editor = manager.get_grid_editor(params.view_id.as_ref())?;
let editor = manager.get_grid_editor(params.view_id.as_ref()).await?;
let _ = editor.move_group_row(params).await?;
Ok(())
}

View File

@ -19,6 +19,8 @@ use flowy_revision::{
};
use flowy_sync::client_grid::{make_grid_block_operations, make_grid_operations, make_grid_view_operations};
use flowy_sync::entities::revision::Revision;
use lib_infra::ref_map::{RefCountHashMap, RefCountValue};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
@ -31,7 +33,7 @@ pub trait GridUser: Send + Sync {
pub type GridTaskSchedulerRwLock = Arc<RwLock<GridTaskScheduler>>;
pub struct GridManager {
grid_editors: Arc<DashMap<String, Arc<GridRevisionEditor>>>,
grid_editors: RwLock<RefCountHashMap<Arc<GridRevisionEditor>>>,
grid_user: Arc<dyn GridUser>,
block_index_cache: Arc<BlockIndexCache>,
#[allow(dead_code)]
@ -46,7 +48,7 @@ impl GridManager {
_rev_web_socket: Arc<dyn RevisionWebSocket>,
database: Arc<dyn GridDatabase>,
) -> Self {
let grid_editors = Arc::new(DashMap::new());
let grid_editors = RwLock::new(RefCountHashMap::new());
let kv_persistence = Arc::new(GridKVPersistence::new(database.clone()));
let block_index_cache = Arc::new(BlockIndexCache::new(database.clone()));
let task_scheduler = GridTaskScheduler::new();
@ -107,35 +109,33 @@ impl GridManager {
pub async fn close_grid<T: AsRef<str>>(&self, grid_id: T) -> FlowyResult<()> {
let grid_id = grid_id.as_ref();
tracing::Span::current().record("grid_id", &grid_id);
self.grid_editors.remove(grid_id);
self.grid_editors.write().await.remove(grid_id);
self.task_scheduler.write().await.unregister_handler(grid_id);
Ok(())
}
// #[tracing::instrument(level = "debug", skip(self), err)]
pub fn get_grid_editor(&self, grid_id: &str) -> FlowyResult<Arc<GridRevisionEditor>> {
match self.grid_editors.get(grid_id) {
pub async fn get_grid_editor(&self, grid_id: &str) -> FlowyResult<Arc<GridRevisionEditor>> {
match self.grid_editors.read().await.get(grid_id) {
None => Err(FlowyError::internal().context("Should call open_grid function first")),
Some(editor) => Ok(editor.clone()),
}
}
async fn get_or_create_grid_editor(&self, grid_id: &str) -> FlowyResult<Arc<GridRevisionEditor>> {
match self.grid_editors.get(grid_id) {
None => {
if let Some(editor) = self.grid_editors.get(grid_id) {
tracing::warn!("Grid:{} already open", grid_id);
Ok(editor.clone())
} else {
let db_pool = self.grid_user.db_pool()?;
let editor = self.make_grid_rev_editor(grid_id, db_pool).await?;
self.grid_editors.insert(grid_id.to_string(), editor.clone());
self.task_scheduler.write().await.register_handler(editor.clone());
Ok(editor)
}
}
Some(editor) => Ok(editor.clone()),
if let Some(editor) = self.grid_editors.read().await.get(grid_id) {
return Ok(editor.clone());
}
let db_pool = self.grid_user.db_pool()?;
let editor = self.make_grid_rev_editor(grid_id, db_pool).await?;
self.grid_editors
.write()
.await
.insert(grid_id.to_string(), editor.clone());
self.task_scheduler.write().await.register_handler(editor.clone());
Ok(editor)
}
#[tracing::instrument(level = "trace", skip(self, pool), err)]
@ -240,3 +240,9 @@ pub async fn make_grid_view_data(
Ok(grid_rev_delta_bytes)
}
impl RefCountValue for GridRevisionEditor {
fn did_remove(&self) {
self.close();
}
}

View File

@ -94,6 +94,8 @@ impl GridRevisionEditor {
Ok(editor)
}
pub fn close(&self) {}
/// Save the type-option data to disk and send a `GridNotification::DidUpdateField` notification
/// to dart side.
///

View File

@ -6,12 +6,13 @@ use crate::services::tasks::task::Task;
use crate::services::tasks::{TaskContent, TaskId, TaskStatus};
use flowy_error::FlowyError;
use lib_infra::future::BoxResultFuture;
use lib_infra::ref_map::{RefCountHashMap, RefCountValue};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{watch, RwLock};
pub(crate) trait GridTaskHandler: Send + Sync + 'static {
pub(crate) trait GridTaskHandler: Send + Sync + 'static + RefCountValue {
fn handler_id(&self) -> &str;
fn process_content(&self, content: TaskContent) -> BoxResultFuture<(), FlowyError>;
@ -21,7 +22,7 @@ pub struct GridTaskScheduler {
queue: GridTaskQueue,
store: GridTaskStore,
notifier: watch::Sender<bool>,
handlers: HashMap<TaskHandlerId, Arc<dyn GridTaskHandler>>,
handlers: RefCountHashMap<Arc<dyn GridTaskHandler>>,
}
impl GridTaskScheduler {
@ -32,7 +33,7 @@ impl GridTaskScheduler {
queue: GridTaskQueue::new(),
store: GridTaskStore::new(),
notifier,
handlers: HashMap::new(),
handlers: RefCountHashMap::new(),
};
// The runner will receive the newest value after start running.
scheduler.notify();
@ -54,7 +55,7 @@ impl GridTaskScheduler {
}
pub(crate) fn unregister_handler<T: AsRef<str>>(&mut self, handler_id: T) {
let _ = self.handlers.remove(handler_id.as_ref());
self.handlers.remove(handler_id.as_ref());
}
#[allow(dead_code)]
@ -110,6 +111,7 @@ mod tests {
use crate::services::tasks::{GridTaskHandler, GridTaskScheduler, Task, TaskContent, TaskStatus};
use flowy_error::FlowyError;
use lib_infra::future::BoxResultFuture;
use lib_infra::ref_map::RefCountValue;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::interval;
@ -169,6 +171,11 @@ mod tests {
assert_eq!(rx_2.await.unwrap().status, TaskStatus::Done);
}
struct MockGridTaskHandler();
impl RefCountValue for MockGridTaskHandler {
fn did_remove(&self) {}
}
impl GridTaskHandler for MockGridTaskHandler {
fn handler_id(&self) -> &str {
"1"

View File

@ -1,4 +1,5 @@
pub mod code_gen;
pub mod future;
pub mod ref_map;
pub mod retry;
pub mod util;

View File

@ -0,0 +1,70 @@
use std::collections::HashMap;
use std::sync::Arc;
pub trait RefCountValue {
fn did_remove(&self);
}
struct RefCountHandler<T> {
ref_count: usize,
inner: T,
}
impl<T> RefCountHandler<T> {
pub fn new(inner: T) -> Self {
Self { ref_count: 1, inner }
}
pub fn increase_ref_count(&mut self) {
self.ref_count += 1;
}
}
pub struct RefCountHashMap<T>(HashMap<String, RefCountHandler<T>>);
impl<T> RefCountHashMap<T>
where
T: Clone + Send + Sync + RefCountValue,
{
pub fn new() -> Self {
Self(Default::default())
}
pub fn get(&self, key: &str) -> Option<T> {
self.0.get(key).and_then(|handler| Some(handler.inner.clone()))
}
pub fn insert(&mut self, key: String, value: T) {
if let Some(handler) = self.0.get_mut(&key) {
handler.increase_ref_count();
} else {
let handler = RefCountHandler::new(value);
self.0.insert(key, handler);
}
}
pub fn remove(&mut self, key: &str) {
let mut should_remove = false;
if let Some(value) = self.0.get_mut(key) {
if value.ref_count > 0 {
value.ref_count -= 1;
}
should_remove = value.ref_count == 0;
}
if should_remove {
if let Some(handler) = self.0.remove(key) {
handler.inner.did_remove();
}
}
}
}
impl<T> RefCountValue for Arc<T>
where
T: RefCountValue,
{
fn did_remove(&self) {
(**self).did_remove()
}
}