allow async jobs to set intermediate progress

This commit is contained in:
Stephan Dilly 2021-09-02 18:29:03 +02:00
parent 0454e2a1cd
commit 40e03ba7de
6 changed files with 78 additions and 31 deletions

View File

@ -4,18 +4,54 @@
use crate::error::Result;
use crossbeam_channel::Sender;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, RwLock};
/// Passed to `AsyncJob::run` allowing sending intermediate progress notifications
pub struct RunParams<T: Copy + Send, P: Clone + Send + Sync> {
sender: Sender<T>,
progress: Arc<RwLock<P>>,
}
impl<T: Copy + Send, P: Clone + Send + Sync> RunParams<T, P> {
/// send an intermediate update notification.
/// do not confuse this with the return value of `run`.
/// `send` should only be used about progress notifications
/// and not for the final notifcation indicating the end of the async job.
/// see `run` for more info
pub fn send(&self, notification: T) -> Result<()> {
self.sender.send(notification)?;
Ok(())
}
/// set the current progress
pub fn set_progress(&self, p: P) -> Result<()> {
*(self.progress.write()?) = p;
Ok(())
}
}
/// trait that defines an async task we can run on a threadpool
pub trait AsyncJob: Send + Sync + Clone {
/// defines what notification to send after finish running job
type Notification: Copy + Send + 'static;
/// defines what notification type is used to communicate outside
type Notification: Copy + Send;
/// type of progress
type Progress: Clone + Default + Send + Sync;
/// can run a synchronous time intensive task
/// can run a synchronous time intensive task.
/// the returned notification is used to tell interested parties
/// that the job finished and the job can be access via `take_last`.
/// prior to this final notification it is not safe to assume `take_last`
/// will already return the correct job
fn run(
&mut self,
sender: Sender<Self::Notification>,
params: RunParams<Self::Notification, Self::Progress>,
) -> Result<Self::Notification>;
/// allows observers to get intermediate progress status if the job customizes it
/// by default this will be returning ()::Default
fn get_progress(&self) -> Self::Progress {
Self::Progress::default()
}
}
/// Abstraction for a FIFO task queue that will only queue up **one** `next` job.
@ -24,6 +60,7 @@ pub trait AsyncJob: Send + Sync + Clone {
pub struct AsyncSingleJob<J: AsyncJob> {
next: Arc<Mutex<Option<J>>>,
last: Arc<Mutex<Option<J>>>,
progress: Arc<RwLock<J::Progress>>,
sender: Sender<J::Notification>,
pending: Arc<Mutex<()>>,
}
@ -35,6 +72,7 @@ impl<J: 'static + AsyncJob> AsyncSingleJob<J> {
next: Arc::new(Mutex::new(None)),
last: Arc::new(Mutex::new(None)),
pending: Arc::new(Mutex::new(())),
progress: Arc::new(RwLock::new(J::Progress::default())),
sender,
}
}
@ -73,16 +111,20 @@ impl<J: 'static + AsyncJob> AsyncSingleJob<J> {
self.check_for_job()
}
///
pub fn progress(&self) -> Option<J::Progress> {
self.progress.read().ok().map(|d| (*d).clone())
}
fn check_for_job(&self) -> bool {
if self.is_pending() {
return false;
}
if let Some(task) = self.take_next() {
let self_arc = self.clone();
let self_clone = (*self).clone();
rayon_core::spawn(move || {
if let Err(e) = self_arc.run_job(task) {
if let Err(e) = self_clone.run_job(task) {
log::error!("async job error: {}", e);
}
});
@ -98,7 +140,10 @@ impl<J: 'static + AsyncJob> AsyncSingleJob<J> {
{
let _pending = self.pending.lock()?;
let notification = task.run(self.sender.clone())?;
let notification = task.run(RunParams {
progress: self.progress.clone(),
sender: self.sender.clone(),
})?;
if let Ok(mut last) = self.last.lock() {
*last = Some(task);
@ -149,10 +194,11 @@ mod test {
impl AsyncJob for TestJob {
type Notification = TestNotificaton;
type Progress = ();
fn run(
&mut self,
_sender: Sender<Self::Notification>,
_params: RunParams<Self::Notification, Self::Progress>,
) -> Result<Self::Notification> {
println!("[job] wait");

View File

@ -4,7 +4,7 @@ use easy_cast::{Conv, ConvFloat};
use std::cmp;
///
#[derive(Clone, Copy, Debug, PartialEq)]
#[derive(Clone, Copy, Default, Debug, PartialEq)]
pub struct ProgressPercent {
/// percent 0..100
pub progress: u8,

View File

@ -1,9 +1,7 @@
//!
use crossbeam_channel::Sender;
use crate::{
asyncjob::AsyncJob,
asyncjob::{AsyncJob, RunParams},
error::Result,
sync::cred::BasicAuthCredential,
sync::remotes::{get_default_remote, tags_missing_remote},
@ -53,10 +51,11 @@ impl AsyncRemoteTagsJob {
impl AsyncJob for AsyncRemoteTagsJob {
type Notification = AsyncGitNotification;
type Progress = ();
fn run(
&mut self,
_sender: Sender<Self::Notification>,
_params: RunParams<Self::Notification, Self::Progress>,
) -> Result<Self::Notification> {
if let Ok(mut state) = self.state.lock() {
*state = state.take().map(|state| match state {

View File

@ -65,8 +65,9 @@ impl SyntaxTextComponent {
) = ev
{
match progress {
SyntaxHighlightProgress::Progress(progress) => {
self.syntax_progress = Some(progress);
SyntaxHighlightProgress::Progress => {
self.syntax_progress =
self.async_highlighting.progress();
}
SyntaxHighlightProgress::Done => {
self.syntax_progress = None;

View File

@ -78,7 +78,7 @@ pub enum QueueEvent {
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum SyntaxHighlightProgress {
Progress(asyncgit::ProgressPercent),
Progress,
Done,
}

View File

@ -1,5 +1,7 @@
use asyncgit::{asyncjob::AsyncJob, ProgressPercent};
use crossbeam_channel::Sender;
use asyncgit::{
asyncjob::{AsyncJob, RunParams},
ProgressPercent,
};
use lazy_static::lazy_static;
use scopetime::scope_time;
use std::{
@ -70,7 +72,7 @@ impl SyntaxText {
pub fn new(
text: String,
file_path: &Path,
sender: &Sender<AsyncAppNotification>,
params: &RunParams<AsyncAppNotification, ProgressPercent>,
) -> asyncgit::Result<Self> {
scope_time!("syntax_highlighting");
log::debug!("syntax: {:?}", file_path);
@ -110,10 +112,9 @@ impl SyntaxText {
total_count,
Duration::from_millis(200),
);
sender.send(AsyncAppNotification::SyntaxHighlighting(
SyntaxHighlightProgress::Progress(
buffer.send_progress(),
),
params.set_progress(buffer.send_progress())?;
params.send(AsyncAppNotification::SyntaxHighlighting(
SyntaxHighlightProgress::Progress,
))?;
for (number, line) in text.lines().enumerate() {
@ -134,11 +135,10 @@ impl SyntaxText {
});
if buffer.update(number) {
sender.send(
params.set_progress(buffer.send_progress())?;
params.send(
AsyncAppNotification::SyntaxHighlighting(
SyntaxHighlightProgress::Progress(
buffer.send_progress(),
),
SyntaxHighlightProgress::Progress,
),
)?;
}
@ -241,10 +241,11 @@ impl AsyncSyntaxJob {
impl AsyncJob for AsyncSyntaxJob {
type Notification = AsyncAppNotification;
type Progress = ProgressPercent;
fn run(
&mut self,
sender: Sender<Self::Notification>,
params: RunParams<Self::Notification, Self::Progress>,
) -> asyncgit::Result<Self::Notification> {
let mut state_mutex = self.state.lock()?;
@ -254,7 +255,7 @@ impl AsyncJob for AsyncSyntaxJob {
let syntax = SyntaxText::new(
content,
Path::new(&path),
&sender,
&params,
)?;
JobState::Response(syntax)
}