chore: add task test

This commit is contained in:
appflowy 2022-07-19 20:39:05 +08:00
parent 26c5dcd182
commit 45011e2dd3
7 changed files with 210 additions and 48 deletions

View File

@ -130,11 +130,7 @@ impl GridFilterService {
let handler_id = self.grid_pad.read().await.grid_id();
let context = FilterTaskContext { blocks };
Task {
handler_id,
id: task_id,
content: TaskContent::Filter(context),
}
Task::new(&handler_id, task_id, TaskContent::Filter(context))
}
async fn notify(&self, changesets: Vec<GridBlockChangesetPB>) {

View File

@ -1,23 +1,23 @@
use crate::manager::GridTaskSchedulerRwLock;
use crate::services::grid_editor::GridRevisionEditor;
use crate::services::tasks::{GridTaskHandler, Task, TaskContent, TaskHandlerId, TaskId};
use crate::services::tasks::{GridTaskHandler, Task, TaskContent, TaskId};
use flowy_error::FlowyError;
use futures::future::BoxFuture;
use lib_infra::future::BoxResultFuture;
pub(crate) trait GridServiceTaskScheduler: Send + Sync + 'static {
fn gen_task_id(&self) -> BoxFuture<TaskId>;
fn register_task(&self, task: Task) -> BoxFuture<()>;
fn add_task(&self, task: Task) -> BoxFuture<()>;
}
impl GridTaskHandler for GridRevisionEditor {
fn handler_id(&self) -> &TaskHandlerId {
fn handler_id(&self) -> &str {
&self.grid_id
}
fn process_task(&self, task: Task) -> BoxResultFuture<(), FlowyError> {
fn process_content(&self, content: TaskContent) -> BoxResultFuture<(), FlowyError> {
Box::pin(async move {
match task.content {
match content {
TaskContent::Snapshot => {}
TaskContent::Filter(context) => self.filter_service.process(context).await?,
}
@ -32,10 +32,10 @@ impl GridServiceTaskScheduler for GridTaskSchedulerRwLock {
Box::pin(async move { this.read().await.next_task_id() })
}
fn register_task(&self, task: Task) -> BoxFuture<()> {
fn add_task(&self, task: Task) -> BoxFuture<()> {
let this = self.clone();
Box::pin(async move {
this.write().await.register_task(task);
this.write().await.add_task(task);
})
}
}

View File

@ -20,7 +20,12 @@ impl GridTaskQueue {
}
pub(crate) fn push(&mut self, task: &Task) {
let task_type = match task.content {
if task.content.is_none() {
tracing::warn!("Ignore task: {} with empty content", task.id);
return;
}
let task_type = match task.content.as_ref().unwrap() {
TaskContent::Snapshot => TaskType::Snapshot,
TaskContent::Filter { .. } => TaskType::Filter,
};
@ -28,7 +33,7 @@ impl GridTaskQueue {
ty: task_type,
id: task.id,
};
match self.index_tasks.entry("1".to_owned()) {
match self.index_tasks.entry(task.handler_id.clone()) {
Entry::Occupied(entry) => {
let mut list = entry.get().borrow_mut();
assert!(list.peek().map(|old_id| pending_task.id >= old_id.id).unwrap_or(true));
@ -44,6 +49,11 @@ impl GridTaskQueue {
}
}
#[allow(dead_code)]
pub(crate) fn clear(&mut self) {
self.queue.clear();
}
pub(crate) fn mut_head<T, F>(&mut self, mut f: F) -> Option<T>
where
F: FnMut(&mut TaskList) -> Option<T>,

View File

@ -1,4 +1,5 @@
use crate::services::tasks::scheduler::GridTaskScheduler;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{watch, RwLock};
@ -7,13 +8,13 @@ use tokio::time::interval;
pub struct GridTaskRunner {
scheduler: Arc<RwLock<GridTaskScheduler>>,
debounce_duration: Duration,
notifier: Option<watch::Receiver<()>>,
notifier: Option<watch::Receiver<bool>>,
}
impl GridTaskRunner {
pub fn new(
scheduler: Arc<RwLock<GridTaskScheduler>>,
notifier: watch::Receiver<()>,
notifier: watch::Receiver<bool>,
debounce_duration: Duration,
) -> Self {
Self {
@ -34,12 +35,13 @@ impl GridTaskRunner {
// The runner will be stopped if the corresponding Sender drop.
break;
}
if *notifier.borrow() {
break;
}
let mut interval = interval(self.debounce_duration);
interval.tick().await;
if let Err(e) = self.scheduler.write().await.process_next_task().await {
tracing::error!("{:?}", e);
}
let _ = self.scheduler.write().await.process_next_task().await;
}
}
}

View File

@ -3,8 +3,8 @@ use crate::services::tasks::runner::GridTaskRunner;
use crate::services::tasks::store::GridTaskStore;
use crate::services::tasks::task::Task;
use crate::services::tasks::TaskId;
use flowy_error::{FlowyError, FlowyResult};
use crate::services::tasks::{TaskContent, TaskId, TaskStatus};
use flowy_error::FlowyError;
use lib_infra::future::BoxResultFuture;
use std::collections::HashMap;
use std::sync::Arc;
@ -12,21 +12,21 @@ use std::time::Duration;
use tokio::sync::{watch, RwLock};
pub(crate) trait GridTaskHandler: Send + Sync + 'static {
fn handler_id(&self) -> &TaskHandlerId;
fn handler_id(&self) -> &str;
fn process_task(&self, task: Task) -> BoxResultFuture<(), FlowyError>;
fn process_content(&self, content: TaskContent) -> BoxResultFuture<(), FlowyError>;
}
pub struct GridTaskScheduler {
queue: GridTaskQueue,
store: GridTaskStore,
notifier: watch::Sender<()>,
notifier: watch::Sender<bool>,
handlers: HashMap<TaskHandlerId, Arc<dyn GridTaskHandler>>,
}
impl GridTaskScheduler {
pub(crate) fn new() -> Arc<RwLock<Self>> {
let (notifier, rx) = watch::channel(());
let (notifier, rx) = watch::channel(false);
let scheduler = Self {
queue: GridTaskQueue::new(),
@ -57,25 +57,38 @@ impl GridTaskScheduler {
let _ = self.handlers.remove(handler_id.as_ref());
}
pub(crate) async fn process_next_task(&mut self) -> FlowyResult<()> {
let mut get_next_task = || {
#[allow(dead_code)]
pub(crate) fn stop(&mut self) {
let _ = self.notifier.send(true);
self.queue.clear();
self.store.clear();
}
pub(crate) async fn process_next_task(&mut self) -> Option<()> {
let pending_task = self.queue.mut_head(|list| list.pop())?;
let task = self.store.remove_task(&pending_task.id)?;
Some(task)
let mut task = self.store.remove_task(&pending_task.id)?;
let handler = self.handlers.get(&task.handler_id)?;
let ret = task.ret.take()?;
let content = task.content.take()?;
task.set_status(TaskStatus::Processing);
let _ = match handler.process_content(content).await {
Ok(_) => {
task.set_status(TaskStatus::Done);
let _ = ret.send(task.into());
}
Err(e) => {
tracing::error!("Process task failed: {:?}", e);
task.set_status(TaskStatus::Failure);
let _ = ret.send(task.into());
}
};
if let Some(task) = get_next_task() {
match self.handlers.get(&task.handler_id) {
None => {}
Some(handler) => {
let _ = handler.process_task(task).await;
}
}
}
Ok(())
self.notify();
None
}
pub(crate) fn register_task(&mut self, task: Task) {
pub(crate) fn add_task(&mut self, task: Task) {
assert!(!task.is_finished());
self.queue.push(&task);
self.store.insert_task(task);
@ -87,6 +100,87 @@ impl GridTaskScheduler {
}
pub(crate) fn notify(&self) {
let _ = self.notifier.send(());
let _ = self.notifier.send(false);
}
}
#[cfg(test)]
mod tests {
use crate::services::grid_editor_task::GridServiceTaskScheduler;
use crate::services::tasks::{GridTaskHandler, GridTaskScheduler, Task, TaskContent, TaskStatus};
use flowy_error::FlowyError;
use lib_infra::future::BoxResultFuture;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::interval;
#[tokio::test]
async fn task_scheduler_snapshot_task_test() {
let scheduler = GridTaskScheduler::new();
scheduler
.write()
.await
.register_handler(Arc::new(MockGridTaskHandler()));
let task_id = scheduler.gen_task_id().await;
let mut task = Task::new("1", task_id, TaskContent::Snapshot);
let rx = task.rx.take().unwrap();
scheduler.write().await.add_task(task);
assert_eq!(rx.await.unwrap().status, TaskStatus::Done);
}
#[tokio::test]
async fn task_scheduler_snapshot_task_cancel_test() {
let scheduler = GridTaskScheduler::new();
scheduler
.write()
.await
.register_handler(Arc::new(MockGridTaskHandler()));
let task_id = scheduler.gen_task_id().await;
let mut task = Task::new("1", task_id, TaskContent::Snapshot);
let rx = task.rx.take().unwrap();
scheduler.write().await.add_task(task);
scheduler.write().await.stop();
assert_eq!(rx.await.unwrap().status, TaskStatus::Cancel);
}
#[tokio::test]
async fn task_scheduler_multi_task_test() {
let scheduler = GridTaskScheduler::new();
scheduler
.write()
.await
.register_handler(Arc::new(MockGridTaskHandler()));
let task_id = scheduler.gen_task_id().await;
let mut task_1 = Task::new("1", task_id, TaskContent::Snapshot);
let rx_1 = task_1.rx.take().unwrap();
let task_id = scheduler.gen_task_id().await;
let mut task_2 = Task::new("1", task_id, TaskContent::Snapshot);
let rx_2 = task_2.rx.take().unwrap();
scheduler.write().await.add_task(task_1);
scheduler.write().await.add_task(task_2);
assert_eq!(rx_1.await.unwrap().status, TaskStatus::Done);
assert_eq!(rx_2.await.unwrap().status, TaskStatus::Done);
}
struct MockGridTaskHandler();
impl GridTaskHandler for MockGridTaskHandler {
fn handler_id(&self) -> &str {
"1"
}
fn process_content(&self, _content: TaskContent) -> BoxResultFuture<(), FlowyError> {
Box::pin(async move {
let mut interval = interval(Duration::from_secs(1));
interval.tick().await;
interval.tick().await;
Ok(())
})
}
}
}

View File

@ -1,6 +1,7 @@
use crate::services::tasks::task::Task;
use crate::services::tasks::TaskId;
use crate::services::tasks::{TaskId, TaskStatus};
use std::collections::HashMap;
use std::mem;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering::SeqCst;
@ -25,6 +26,18 @@ impl GridTaskStore {
self.tasks.remove(task_id)
}
#[allow(dead_code)]
pub(crate) fn clear(&mut self) {
let tasks = mem::take(&mut self.tasks);
tasks.into_values().for_each(|mut task| {
if task.ret.is_some() {
let ret = task.ret.take().unwrap();
task.set_status(TaskStatus::Cancel);
let _ = ret.send(task.into());
}
});
}
pub(crate) fn next_task_id(&self) -> TaskId {
let _ = self.task_id_counter.fetch_add(1, SeqCst);
self.task_id_counter.load(SeqCst)

View File

@ -1,3 +1,5 @@
#![allow(clippy::all)]
#![allow(dead_code)]
use crate::services::row::GridBlockSnapshot;
use crate::services::tasks::queue::TaskHandlerId;
use std::cmp::Ordering;
@ -60,14 +62,59 @@ pub(crate) enum TaskContent {
Filter(FilterTaskContext),
}
#[derive(Debug, Eq, PartialEq)]
pub(crate) enum TaskStatus {
Pending,
Processing,
Done,
Failure,
Cancel,
}
pub(crate) struct Task {
pub handler_id: TaskHandlerId,
pub id: TaskId,
pub content: TaskContent,
pub handler_id: TaskHandlerId,
pub content: Option<TaskContent>,
status: TaskStatus,
pub ret: Option<tokio::sync::oneshot::Sender<TaskResult>>,
pub rx: Option<tokio::sync::oneshot::Receiver<TaskResult>>,
}
pub(crate) struct TaskResult {
pub id: TaskId,
pub(crate) status: TaskStatus,
}
impl std::convert::From<Task> for TaskResult {
fn from(task: Task) -> Self {
TaskResult {
id: task.id,
status: task.status,
}
}
}
impl Task {
pub fn new(handler_id: &str, id: TaskId, content: TaskContent) -> Self {
let (ret, rx) = tokio::sync::oneshot::channel();
Self {
handler_id: handler_id.to_owned(),
id,
content: Some(content),
ret: Some(ret),
rx: Some(rx),
status: TaskStatus::Pending,
}
}
pub fn set_status(&mut self, status: TaskStatus) {
self.status = status;
}
pub fn is_finished(&self) -> bool {
todo!()
match self.status {
TaskStatus::Done => true,
_ => false,
}
}
}