add basic dispatchers tests

This commit is contained in:
Nikita Galaiko 2023-06-02 11:08:47 +02:00
parent fef2f97b20
commit 6744016ffe
7 changed files with 49 additions and 23 deletions

View File

@ -1,4 +1,3 @@
# Generated by Cargo
# will have compiled files and executables
/target/

1
src-tauri/Cargo.lock generated
View File

@ -1473,7 +1473,6 @@ dependencies = [
"filetime",
"fslock",
"futures",
"futures-util",
"git2",
"log",
"md5",

View File

@ -42,7 +42,6 @@ tokio-tungstenite = "0.18.0"
portable-pty = { version = "0.8.0", features = [ "serde_support" ] }
bytes = "1.1.0"
futures = "0.3"
futures-util = "0.3.8"
timed = "0.2.1"
serde-jsonlines = "0.4.0"
scopeguard = "1.1.0"

View File

@ -4,10 +4,6 @@ use std::{
};
use anyhow::{Context, Result};
use futures::{
channel::mpsc::{channel, Receiver},
SinkExt, StreamExt,
};
use notify::{Config, Event, RecommendedWatcher, Watcher};
use tokio::sync::mpsc;
@ -58,7 +54,7 @@ impl Dispatcher {
log::info!("{}: file watcher started", self.project_id);
while let Some(res) = rx.next().await {
while let Some(res) = rx.recv().await {
match res {
Ok(event) => {
if !is_interesting_event(&event.kind) {
@ -105,17 +101,17 @@ fn is_interesting_event(kind: &notify::EventKind) -> bool {
)
}
fn async_watcher() -> notify::Result<(RecommendedWatcher, Receiver<notify::Result<Event>>)> {
let (mut tx, rx) = channel(1);
fn async_watcher() -> notify::Result<(
RecommendedWatcher,
mpsc::UnboundedReceiver<notify::Result<Event>>,
)> {
let (tx, rx) = mpsc::unbounded_channel();
let watcher = RecommendedWatcher::new(
move |res| {
futures::executor::block_on(async {
if let Err(err) = tx.send(res).await {
log::error!("failed to send file change event: {:#}", err);
}
println!("sent");
})
if let Err(err) = tx.send(res) {
log::error!("failed to send file change event: {:#}", err);
}
},
Config::default(),
)?;

View File

@ -49,3 +49,38 @@ impl Dispatcher {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[tokio::test]
async fn test_ticker() {
let (tx, mut rx) = mpsc::unbounded_channel();
let dispatcher = Dispatcher::new("test".to_string());
let dispatcher2 = dispatcher.clone();
let handle = tokio::spawn(async move {
dispatcher2
.start(Duration::from_millis(10), tx)
.await
.unwrap();
});
tokio::time::sleep(Duration::from_millis(50)).await;
dispatcher.stop().unwrap();
handle.await.unwrap();
let mut count = 0;
while let Some(event) = rx.recv().await {
match event {
events::Event::Tick(_) => count += 1,
_ => panic!("unexpected event: {:?}", event),
}
}
assert!(count >= 4);
}
}

View File

@ -2,7 +2,7 @@ use std::{fmt::Display, path, time};
use crate::{bookmarks, deltas, sessions};
#[derive(Debug)]
#[derive(Debug, PartialEq)]
pub enum Event {
Tick(time::SystemTime),
Flush(sessions::Session),

View File

@ -67,15 +67,13 @@ impl<'watcher> Watcher<'watcher> {
log::error!("{}: failed to post event: {:#}", self.project_id, e);
}
},
Some(event) = events_rx.recv() =>
match self.handler.handle(event) {
Some(event) = events_rx.recv() => match self.handler.handle(event) {
Err(err) => log::error!("{}: failed to handle event: {:#}", self.project_id, err),
Ok(events) => {
for event in events {
if let Err(e) = events_tx.send(event) {
log::error!("{}: failed to post event: {:#}", self.project_id, e);
}
if let Err(e) = events_tx.send(event) {
log::error!("{}: failed to post event: {:#}", self.project_id, e);
}
}
}
},