From 45011e2dd3cf77eddda683a39b620ec714a741cb Mon Sep 17 00:00:00 2001 From: appflowy Date: Tue, 19 Jul 2022 20:39:05 +0800 Subject: [PATCH] chore: add task test --- .../src/services/filter/filter_service.rs | 6 +- .../src/services/grid_editor_task.rs | 14 +- .../flowy-grid/src/services/tasks/queue.rs | 14 +- .../flowy-grid/src/services/tasks/runner.rs | 14 +- .../src/services/tasks/scheduler.rs | 142 +++++++++++++++--- .../flowy-grid/src/services/tasks/store.rs | 15 +- .../flowy-grid/src/services/tasks/task.rs | 53 ++++++- 7 files changed, 210 insertions(+), 48 deletions(-) diff --git a/frontend/rust-lib/flowy-grid/src/services/filter/filter_service.rs b/frontend/rust-lib/flowy-grid/src/services/filter/filter_service.rs index be02e80ebd..f3b1d72fb8 100644 --- a/frontend/rust-lib/flowy-grid/src/services/filter/filter_service.rs +++ b/frontend/rust-lib/flowy-grid/src/services/filter/filter_service.rs @@ -130,11 +130,7 @@ impl GridFilterService { let handler_id = self.grid_pad.read().await.grid_id(); let context = FilterTaskContext { blocks }; - Task { - handler_id, - id: task_id, - content: TaskContent::Filter(context), - } + Task::new(&handler_id, task_id, TaskContent::Filter(context)) } async fn notify(&self, changesets: Vec) { diff --git a/frontend/rust-lib/flowy-grid/src/services/grid_editor_task.rs b/frontend/rust-lib/flowy-grid/src/services/grid_editor_task.rs index 1b3a364833..0338730818 100644 --- a/frontend/rust-lib/flowy-grid/src/services/grid_editor_task.rs +++ b/frontend/rust-lib/flowy-grid/src/services/grid_editor_task.rs @@ -1,23 +1,23 @@ use crate::manager::GridTaskSchedulerRwLock; use crate::services::grid_editor::GridRevisionEditor; -use crate::services::tasks::{GridTaskHandler, Task, TaskContent, TaskHandlerId, TaskId}; +use crate::services::tasks::{GridTaskHandler, Task, TaskContent, TaskId}; use flowy_error::FlowyError; use futures::future::BoxFuture; use lib_infra::future::BoxResultFuture; pub(crate) trait GridServiceTaskScheduler: Send + Sync + 'static { fn gen_task_id(&self) -> BoxFuture; - fn register_task(&self, task: Task) -> BoxFuture<()>; + fn add_task(&self, task: Task) -> BoxFuture<()>; } impl GridTaskHandler for GridRevisionEditor { - fn handler_id(&self) -> &TaskHandlerId { + fn handler_id(&self) -> &str { &self.grid_id } - fn process_task(&self, task: Task) -> BoxResultFuture<(), FlowyError> { + fn process_content(&self, content: TaskContent) -> BoxResultFuture<(), FlowyError> { Box::pin(async move { - match task.content { + match content { TaskContent::Snapshot => {} TaskContent::Filter(context) => self.filter_service.process(context).await?, } @@ -32,10 +32,10 @@ impl GridServiceTaskScheduler for GridTaskSchedulerRwLock { Box::pin(async move { this.read().await.next_task_id() }) } - fn register_task(&self, task: Task) -> BoxFuture<()> { + fn add_task(&self, task: Task) -> BoxFuture<()> { let this = self.clone(); Box::pin(async move { - this.write().await.register_task(task); + this.write().await.add_task(task); }) } } diff --git a/frontend/rust-lib/flowy-grid/src/services/tasks/queue.rs b/frontend/rust-lib/flowy-grid/src/services/tasks/queue.rs index 537f789f9b..1ba97afd9a 100644 --- a/frontend/rust-lib/flowy-grid/src/services/tasks/queue.rs +++ b/frontend/rust-lib/flowy-grid/src/services/tasks/queue.rs @@ -20,7 +20,12 @@ impl GridTaskQueue { } pub(crate) fn push(&mut self, task: &Task) { - let task_type = match task.content { + if task.content.is_none() { + tracing::warn!("Ignore task: {} with empty content", task.id); + return; + } + + let task_type = match task.content.as_ref().unwrap() { TaskContent::Snapshot => TaskType::Snapshot, TaskContent::Filter { .. } => TaskType::Filter, }; @@ -28,7 +33,7 @@ impl GridTaskQueue { ty: task_type, id: task.id, }; - match self.index_tasks.entry("1".to_owned()) { + match self.index_tasks.entry(task.handler_id.clone()) { Entry::Occupied(entry) => { let mut list = entry.get().borrow_mut(); assert!(list.peek().map(|old_id| pending_task.id >= old_id.id).unwrap_or(true)); @@ -44,6 +49,11 @@ impl GridTaskQueue { } } + #[allow(dead_code)] + pub(crate) fn clear(&mut self) { + self.queue.clear(); + } + pub(crate) fn mut_head(&mut self, mut f: F) -> Option where F: FnMut(&mut TaskList) -> Option, diff --git a/frontend/rust-lib/flowy-grid/src/services/tasks/runner.rs b/frontend/rust-lib/flowy-grid/src/services/tasks/runner.rs index 713fc15c86..121329edfb 100644 --- a/frontend/rust-lib/flowy-grid/src/services/tasks/runner.rs +++ b/frontend/rust-lib/flowy-grid/src/services/tasks/runner.rs @@ -1,4 +1,5 @@ use crate::services::tasks::scheduler::GridTaskScheduler; + use std::sync::Arc; use std::time::Duration; use tokio::sync::{watch, RwLock}; @@ -7,13 +8,13 @@ use tokio::time::interval; pub struct GridTaskRunner { scheduler: Arc>, debounce_duration: Duration, - notifier: Option>, + notifier: Option>, } impl GridTaskRunner { pub fn new( scheduler: Arc>, - notifier: watch::Receiver<()>, + notifier: watch::Receiver, debounce_duration: Duration, ) -> Self { Self { @@ -34,12 +35,13 @@ impl GridTaskRunner { // The runner will be stopped if the corresponding Sender drop. break; } + + if *notifier.borrow() { + break; + } let mut interval = interval(self.debounce_duration); interval.tick().await; - - if let Err(e) = self.scheduler.write().await.process_next_task().await { - tracing::error!("{:?}", e); - } + let _ = self.scheduler.write().await.process_next_task().await; } } } diff --git a/frontend/rust-lib/flowy-grid/src/services/tasks/scheduler.rs b/frontend/rust-lib/flowy-grid/src/services/tasks/scheduler.rs index 14494842ec..73ba298d9b 100644 --- a/frontend/rust-lib/flowy-grid/src/services/tasks/scheduler.rs +++ b/frontend/rust-lib/flowy-grid/src/services/tasks/scheduler.rs @@ -3,8 +3,8 @@ use crate::services::tasks::runner::GridTaskRunner; use crate::services::tasks::store::GridTaskStore; use crate::services::tasks::task::Task; -use crate::services::tasks::TaskId; -use flowy_error::{FlowyError, FlowyResult}; +use crate::services::tasks::{TaskContent, TaskId, TaskStatus}; +use flowy_error::FlowyError; use lib_infra::future::BoxResultFuture; use std::collections::HashMap; use std::sync::Arc; @@ -12,21 +12,21 @@ use std::time::Duration; use tokio::sync::{watch, RwLock}; pub(crate) trait GridTaskHandler: Send + Sync + 'static { - fn handler_id(&self) -> &TaskHandlerId; + fn handler_id(&self) -> &str; - fn process_task(&self, task: Task) -> BoxResultFuture<(), FlowyError>; + fn process_content(&self, content: TaskContent) -> BoxResultFuture<(), FlowyError>; } pub struct GridTaskScheduler { queue: GridTaskQueue, store: GridTaskStore, - notifier: watch::Sender<()>, + notifier: watch::Sender, handlers: HashMap>, } impl GridTaskScheduler { pub(crate) fn new() -> Arc> { - let (notifier, rx) = watch::channel(()); + let (notifier, rx) = watch::channel(false); let scheduler = Self { queue: GridTaskQueue::new(), @@ -57,25 +57,38 @@ impl GridTaskScheduler { let _ = self.handlers.remove(handler_id.as_ref()); } - pub(crate) async fn process_next_task(&mut self) -> FlowyResult<()> { - let mut get_next_task = || { - let pending_task = self.queue.mut_head(|list| list.pop())?; - let task = self.store.remove_task(&pending_task.id)?; - Some(task) - }; - - if let Some(task) = get_next_task() { - match self.handlers.get(&task.handler_id) { - None => {} - Some(handler) => { - let _ = handler.process_task(task).await; - } - } - } - Ok(()) + #[allow(dead_code)] + pub(crate) fn stop(&mut self) { + let _ = self.notifier.send(true); + self.queue.clear(); + self.store.clear(); } - pub(crate) fn register_task(&mut self, task: Task) { + pub(crate) async fn process_next_task(&mut self) -> Option<()> { + let pending_task = self.queue.mut_head(|list| list.pop())?; + let mut task = self.store.remove_task(&pending_task.id)?; + let handler = self.handlers.get(&task.handler_id)?; + + let ret = task.ret.take()?; + let content = task.content.take()?; + + task.set_status(TaskStatus::Processing); + let _ = match handler.process_content(content).await { + Ok(_) => { + task.set_status(TaskStatus::Done); + let _ = ret.send(task.into()); + } + Err(e) => { + tracing::error!("Process task failed: {:?}", e); + task.set_status(TaskStatus::Failure); + let _ = ret.send(task.into()); + } + }; + self.notify(); + None + } + + pub(crate) fn add_task(&mut self, task: Task) { assert!(!task.is_finished()); self.queue.push(&task); self.store.insert_task(task); @@ -87,6 +100,87 @@ impl GridTaskScheduler { } pub(crate) fn notify(&self) { - let _ = self.notifier.send(()); + let _ = self.notifier.send(false); + } +} + +#[cfg(test)] +mod tests { + use crate::services::grid_editor_task::GridServiceTaskScheduler; + use crate::services::tasks::{GridTaskHandler, GridTaskScheduler, Task, TaskContent, TaskStatus}; + use flowy_error::FlowyError; + use lib_infra::future::BoxResultFuture; + use std::sync::Arc; + use std::time::Duration; + use tokio::time::interval; + + #[tokio::test] + async fn task_scheduler_snapshot_task_test() { + let scheduler = GridTaskScheduler::new(); + scheduler + .write() + .await + .register_handler(Arc::new(MockGridTaskHandler())); + + let task_id = scheduler.gen_task_id().await; + let mut task = Task::new("1", task_id, TaskContent::Snapshot); + let rx = task.rx.take().unwrap(); + scheduler.write().await.add_task(task); + assert_eq!(rx.await.unwrap().status, TaskStatus::Done); + } + + #[tokio::test] + async fn task_scheduler_snapshot_task_cancel_test() { + let scheduler = GridTaskScheduler::new(); + scheduler + .write() + .await + .register_handler(Arc::new(MockGridTaskHandler())); + + let task_id = scheduler.gen_task_id().await; + let mut task = Task::new("1", task_id, TaskContent::Snapshot); + let rx = task.rx.take().unwrap(); + scheduler.write().await.add_task(task); + scheduler.write().await.stop(); + + assert_eq!(rx.await.unwrap().status, TaskStatus::Cancel); + } + + #[tokio::test] + async fn task_scheduler_multi_task_test() { + let scheduler = GridTaskScheduler::new(); + scheduler + .write() + .await + .register_handler(Arc::new(MockGridTaskHandler())); + + let task_id = scheduler.gen_task_id().await; + let mut task_1 = Task::new("1", task_id, TaskContent::Snapshot); + let rx_1 = task_1.rx.take().unwrap(); + + let task_id = scheduler.gen_task_id().await; + let mut task_2 = Task::new("1", task_id, TaskContent::Snapshot); + let rx_2 = task_2.rx.take().unwrap(); + + scheduler.write().await.add_task(task_1); + scheduler.write().await.add_task(task_2); + + assert_eq!(rx_1.await.unwrap().status, TaskStatus::Done); + assert_eq!(rx_2.await.unwrap().status, TaskStatus::Done); + } + struct MockGridTaskHandler(); + impl GridTaskHandler for MockGridTaskHandler { + fn handler_id(&self) -> &str { + "1" + } + + fn process_content(&self, _content: TaskContent) -> BoxResultFuture<(), FlowyError> { + Box::pin(async move { + let mut interval = interval(Duration::from_secs(1)); + interval.tick().await; + interval.tick().await; + Ok(()) + }) + } } } diff --git a/frontend/rust-lib/flowy-grid/src/services/tasks/store.rs b/frontend/rust-lib/flowy-grid/src/services/tasks/store.rs index 21aae60bc4..9f14889e4d 100644 --- a/frontend/rust-lib/flowy-grid/src/services/tasks/store.rs +++ b/frontend/rust-lib/flowy-grid/src/services/tasks/store.rs @@ -1,6 +1,7 @@ use crate::services::tasks::task::Task; -use crate::services::tasks::TaskId; +use crate::services::tasks::{TaskId, TaskStatus}; use std::collections::HashMap; +use std::mem; use std::sync::atomic::AtomicU32; use std::sync::atomic::Ordering::SeqCst; @@ -25,6 +26,18 @@ impl GridTaskStore { self.tasks.remove(task_id) } + #[allow(dead_code)] + pub(crate) fn clear(&mut self) { + let tasks = mem::take(&mut self.tasks); + tasks.into_values().for_each(|mut task| { + if task.ret.is_some() { + let ret = task.ret.take().unwrap(); + task.set_status(TaskStatus::Cancel); + let _ = ret.send(task.into()); + } + }); + } + pub(crate) fn next_task_id(&self) -> TaskId { let _ = self.task_id_counter.fetch_add(1, SeqCst); self.task_id_counter.load(SeqCst) diff --git a/frontend/rust-lib/flowy-grid/src/services/tasks/task.rs b/frontend/rust-lib/flowy-grid/src/services/tasks/task.rs index 92575dabdc..92950b02aa 100644 --- a/frontend/rust-lib/flowy-grid/src/services/tasks/task.rs +++ b/frontend/rust-lib/flowy-grid/src/services/tasks/task.rs @@ -1,3 +1,5 @@ +#![allow(clippy::all)] +#![allow(dead_code)] use crate::services::row::GridBlockSnapshot; use crate::services::tasks::queue::TaskHandlerId; use std::cmp::Ordering; @@ -60,14 +62,59 @@ pub(crate) enum TaskContent { Filter(FilterTaskContext), } +#[derive(Debug, Eq, PartialEq)] +pub(crate) enum TaskStatus { + Pending, + Processing, + Done, + Failure, + Cancel, +} + pub(crate) struct Task { - pub handler_id: TaskHandlerId, pub id: TaskId, - pub content: TaskContent, + pub handler_id: TaskHandlerId, + pub content: Option, + status: TaskStatus, + pub ret: Option>, + pub rx: Option>, +} + +pub(crate) struct TaskResult { + pub id: TaskId, + pub(crate) status: TaskStatus, +} + +impl std::convert::From for TaskResult { + fn from(task: Task) -> Self { + TaskResult { + id: task.id, + status: task.status, + } + } } impl Task { + pub fn new(handler_id: &str, id: TaskId, content: TaskContent) -> Self { + let (ret, rx) = tokio::sync::oneshot::channel(); + Self { + handler_id: handler_id.to_owned(), + id, + content: Some(content), + ret: Some(ret), + rx: Some(rx), + status: TaskStatus::Pending, + } + } + + pub fn set_status(&mut self, status: TaskStatus) { + self.status = status; + } + pub fn is_finished(&self) -> bool { - todo!() + match self.status { + TaskStatus::Done => true, + _ => false, + } } }