chore: gen filter task

This commit is contained in:
appflowy 2022-06-29 20:51:53 +08:00
parent 6554b8354b
commit 4d8f101a42
7 changed files with 47 additions and 17 deletions

View File

@ -3,28 +3,26 @@ use crate::services::block_manager::GridBlockManager;
use crate::services::tasks::Task;
use flowy_error::FlowyResult;
use crate::services::grid_editor_task::GridServiceTaskScheduler;
use flowy_sync::client_grid::GridRevisionPad;
use std::sync::Arc;
use tokio::sync::RwLock;
pub(crate) struct GridFilterService {
#[allow(dead_code)]
scheduler: GridTaskSchedulerRwLock,
#[allow(dead_code)]
scheduler: Arc<dyn GridServiceTaskScheduler>,
grid_pad: Arc<RwLock<GridRevisionPad>>,
#[allow(dead_code)]
block_manager: Arc<GridBlockManager>,
}
impl GridFilterService {
pub fn new(
pub fn new<S: GridServiceTaskScheduler>(
grid_pad: Arc<RwLock<GridRevisionPad>>,
block_manager: Arc<GridBlockManager>,
scheduler: GridTaskSchedulerRwLock,
scheduler: S,
) -> Self {
Self {
grid_pad,
block_manager,
scheduler,
scheduler: Arc::new(scheduler),
}
}
@ -33,6 +31,8 @@ impl GridFilterService {
}
pub async fn notify_changed(&self) {
let task_id = self.scheduler.gen_task_id().await;
//
// let grid_pad = self.grid_pad.read().await;
// match grid_pad.get_filters(None) {

View File

@ -475,7 +475,10 @@ impl GridRevisionEditor {
.await?;
if is_filter_changed {
self.filter_service.notify_changed().await;
let filter_service = self.filter_service.clone();
tokio::spawn(async move {
filter_service.notify_changed().await;
});
}
Ok(())
}

View File

@ -1,9 +1,15 @@
use crate::manager::GridTaskSchedulerRwLock;
use crate::services::grid_editor::GridRevisionEditor;
use crate::services::tasks::{GridTaskHandler, Task, TaskContent, TaskHandlerId};
use crate::services::tasks::{GridTaskHandler, Task, TaskContent, TaskHandlerId, TaskId};
use flowy_error::FlowyError;
use futures::future::BoxFuture;
use lib_infra::future::BoxResultFuture;
pub trait GridServiceTaskScheduler: Send + Sync + 'static {
fn gen_task_id(&self) -> BoxFuture<TaskId>;
fn register_task(&self, task: Task) -> BoxFuture<()>;
}
impl GridTaskHandler for GridRevisionEditor {
fn handler_id(&self) -> &TaskHandlerId {
&self.grid_id
@ -13,9 +19,23 @@ impl GridTaskHandler for GridRevisionEditor {
Box::pin(async move {
match &task.content {
TaskContent::Snapshot { .. } => {}
TaskContent::Filter => self.filter_service.process_task(task).await?,
TaskContent::Filter { .. } => self.filter_service.process_task(task).await?,
}
Ok(())
})
}
}
impl GridServiceTaskScheduler for GridTaskSchedulerRwLock {
fn gen_task_id(&self) -> BoxFuture<TaskId> {
let this = self.clone();
Box::pin(async move { this.read().await.next_task_id() })
}
fn register_task(&self, task: Task) -> BoxFuture<()> {
let this = self.clone();
Box::pin(async move {
this.write().await.register_task(task);
})
}
}

View File

@ -22,7 +22,7 @@ impl GridTaskQueue {
pub(crate) fn push(&mut self, task: &Task) {
let task_type = match task.content {
TaskContent::Snapshot { .. } => TaskType::Snapshot,
TaskContent::Filter => TaskType::Filter,
TaskContent::Filter { .. } => TaskType::Filter,
};
let pending_task = PendingTask {
ty: task_type,

View File

@ -3,6 +3,7 @@ use crate::services::tasks::runner::GridTaskRunner;
use crate::services::tasks::store::GridTaskStore;
use crate::services::tasks::task::Task;
use crate::services::tasks::TaskId;
use flowy_error::{FlowyError, FlowyResult};
use lib_infra::future::BoxResultFuture;
use std::collections::HashMap;
@ -81,6 +82,10 @@ impl GridTaskScheduler {
self.notify();
}
pub fn next_task_id(&self) -> TaskId {
self.store.next_task_id()
}
pub fn notify(&self) {
let _ = self.notifier.send(());
}

View File

@ -24,7 +24,7 @@ impl GridTaskStore {
pub fn remove_task(&mut self, task_id: &TaskId) -> Option<Task> {
self.tasks.remove(task_id)
}
#[allow(dead_code)]
pub fn next_task_id(&self) -> TaskId {
let _ = self.task_id_counter.fetch_add(1, SeqCst);
self.task_id_counter.load(SeqCst)

View File

@ -44,16 +44,18 @@ impl Ord for PendingTask {
(TaskType::Snapshot, TaskType::Snapshot) => Ordering::Equal,
(TaskType::Snapshot, _) => Ordering::Greater,
(_, TaskType::Snapshot) => Ordering::Less,
(TaskType::Filter, TaskType::Filter) => self.id.cmp(&other.id),
(TaskType::Filter, TaskType::Filter) => self.id.cmp(&other.id).reverse(),
}
}
}
pub type ContentId = String;
pub struct SnapshotTaskContext {}
pub struct FilterTaskContext {}
pub enum TaskContent {
Snapshot { content_id: ContentId },
Filter,
Snapshot { context: SnapshotTaskContext },
Filter { context: FilterTaskContext },
}
pub struct Task {