mirror of
https://github.com/gitbutlerapp/gitbutler.git
synced 2024-11-28 22:03:30 +03:00
Merge pull request #3737 from anaisbetts/flushable-debounce
Avoid file locks by being smarter about when we try to rebuild our information
This commit is contained in:
commit
9f318626f8
18
Cargo.lock
generated
18
Cargo.lock
generated
@ -2211,19 +2211,21 @@ dependencies = [
|
||||
"anyhow",
|
||||
"backoff",
|
||||
"crossbeam-channel",
|
||||
"file-id",
|
||||
"futures",
|
||||
"git2",
|
||||
"gitbutler-core",
|
||||
"gitbutler-testsupport",
|
||||
"itertools 0.12.1",
|
||||
"notify",
|
||||
"notify-debouncer-full",
|
||||
"once_cell",
|
||||
"parking_lot 0.12.1",
|
||||
"tempfile",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"walkdir",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -4102,20 +4104,6 @@ dependencies = [
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "notify-debouncer-full"
|
||||
version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "49f5dab59c348b9b50cf7f261960a20e389feb2713636399cd9082cd4b536154"
|
||||
dependencies = [
|
||||
"crossbeam-channel",
|
||||
"file-id",
|
||||
"log",
|
||||
"notify",
|
||||
"parking_lot 0.12.1",
|
||||
"walkdir",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ntapi"
|
||||
version = "0.4.1"
|
||||
|
@ -1,11 +1,14 @@
|
||||
use std::io::Write;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::{
|
||||
io::Write,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
|
||||
use anyhow::Result;
|
||||
use bstr::BString;
|
||||
use gix::dir::walk::EmissionMode;
|
||||
use gix::tempfile::create_dir::Retries;
|
||||
use gix::tempfile::{AutoRemove, ContainingDirectory};
|
||||
use gix::{
|
||||
dir::walk::EmissionMode,
|
||||
tempfile::{create_dir::Retries, AutoRemove, ContainingDirectory},
|
||||
};
|
||||
use walkdir::WalkDir;
|
||||
|
||||
// Returns an ordered list of relative paths for files inside a directory recursively.
|
||||
@ -58,21 +61,13 @@ pub(crate) fn write<P: AsRef<Path>>(
|
||||
file_path: P,
|
||||
contents: impl AsRef<[u8]>,
|
||||
) -> anyhow::Result<()> {
|
||||
#[cfg(windows)]
|
||||
{
|
||||
Ok(std::fs::write(file_path, contents)?)
|
||||
}
|
||||
|
||||
#[cfg(not(windows))]
|
||||
{
|
||||
let mut temp_file = gix::tempfile::new(
|
||||
file_path.as_ref().parent().unwrap(),
|
||||
ContainingDirectory::Exists,
|
||||
AutoRemove::Tempfile,
|
||||
)?;
|
||||
temp_file.write_all(contents.as_ref())?;
|
||||
Ok(persist_tempfile(temp_file, file_path)?)
|
||||
}
|
||||
let mut temp_file = gix::tempfile::new(
|
||||
file_path.as_ref().parent().unwrap(),
|
||||
ContainingDirectory::Exists,
|
||||
AutoRemove::Tempfile,
|
||||
)?;
|
||||
temp_file.write_all(contents.as_ref())?;
|
||||
Ok(persist_tempfile(temp_file, file_path)?)
|
||||
}
|
||||
|
||||
/// Write a single file so that the write either fully succeeds, or fully fails,
|
||||
@ -81,27 +76,16 @@ pub(crate) fn create_dirs_then_write<P: AsRef<Path>>(
|
||||
file_path: P,
|
||||
contents: impl AsRef<[u8]>,
|
||||
) -> std::io::Result<()> {
|
||||
#[cfg(windows)]
|
||||
{
|
||||
let dir = file_path.as_ref().parent().unwrap();
|
||||
if !dir.exists() {
|
||||
std::fs::create_dir_all(dir)?;
|
||||
}
|
||||
std::fs::write(file_path, contents)
|
||||
}
|
||||
|
||||
#[cfg(not(windows))]
|
||||
{
|
||||
let mut temp_file = gix::tempfile::new(
|
||||
file_path.as_ref().parent().unwrap(),
|
||||
ContainingDirectory::CreateAllRaceProof(Retries::default()),
|
||||
AutoRemove::Tempfile,
|
||||
)?;
|
||||
temp_file.write_all(contents.as_ref())?;
|
||||
persist_tempfile(temp_file, file_path)
|
||||
}
|
||||
let mut temp_file = gix::tempfile::new(
|
||||
file_path.as_ref().parent().unwrap(),
|
||||
ContainingDirectory::CreateAllRaceProof(Retries::default()),
|
||||
AutoRemove::Tempfile,
|
||||
)?;
|
||||
temp_file.write_all(contents.as_ref())?;
|
||||
persist_tempfile(temp_file, file_path)
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn persist_tempfile(
|
||||
tempfile: gix::tempfile::Handle<gix::tempfile::handle::Writable>,
|
||||
to_path: impl AsRef<Path>,
|
||||
|
@ -91,7 +91,7 @@ unsafe impl super::GitExecutor for TokioExecutor {
|
||||
#[cfg(windows)]
|
||||
{
|
||||
cmd.envs(envs.iter().map(|(k, v)| {
|
||||
let v = v.replace("\\", "\\\\");
|
||||
let v = v.replace('\\', "\\\\");
|
||||
(k, v)
|
||||
}));
|
||||
}
|
||||
|
@ -213,6 +213,17 @@ fn main() {
|
||||
])
|
||||
.menu(menu::build(tauri_context.package_info()))
|
||||
.on_menu_event(|event|menu::handle_event(&event))
|
||||
.on_window_event(|event| {
|
||||
if let tauri::WindowEvent::Focused(focused) = event.event() {
|
||||
if *focused {
|
||||
tokio::task::spawn(async move {
|
||||
let _ = event.window().app_handle()
|
||||
.state::<watcher::Watchers>()
|
||||
.flush().await;
|
||||
});
|
||||
}
|
||||
}
|
||||
})
|
||||
.build(tauri_context)
|
||||
.expect("Failed to build tauri app")
|
||||
.run(|app_handle, event| {
|
||||
|
@ -121,6 +121,15 @@ impl Watchers {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn flush(&self) -> Result<()> {
|
||||
let watcher = self.watcher.lock().await;
|
||||
if let Some(handle) = watcher.as_ref() {
|
||||
handle.flush()?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn stop(&self, project_id: ProjectId) {
|
||||
let mut handle = self.watcher.lock().await;
|
||||
if handle
|
||||
|
@ -19,7 +19,9 @@ git2.workspace = true
|
||||
|
||||
backoff = "0.4.0"
|
||||
notify = { version = "6.0.1" }
|
||||
notify-debouncer-full = "0.3.1"
|
||||
parking_lot = "0.12.1"
|
||||
file-id = "0.2.1"
|
||||
walkdir = "2.2.2"
|
||||
crossbeam-channel = "0.5.12"
|
||||
itertools = "0.12"
|
||||
|
||||
|
689
crates/gitbutler-watcher/src/debouncer.rs
Normal file
689
crates/gitbutler-watcher/src/debouncer.rs
Normal file
@ -0,0 +1,689 @@
|
||||
//! A debouncer for [notify] that is optimized for ease of use.
|
||||
//!
|
||||
//! * Only emits a single `Rename` event if the rename `From` and `To` events can be matched
|
||||
//! * Merges multiple `Rename` events
|
||||
//! * Takes `Rename` events into account and updates paths for events that occurred before the rename event, but which haven't been emitted, yet
|
||||
//! * Optionally keeps track of the file system IDs all files and stiches rename events together (FSevents, Windows)
|
||||
//! * Emits only one `Remove` event when deleting a directory (inotify)
|
||||
//! * Doesn't emit duplicate create events
|
||||
//! * Doesn't emit `Modify` events after a `Create` event
|
||||
//!
|
||||
//! # Installation
|
||||
//!
|
||||
//! ```toml
|
||||
//! [dependencies]
|
||||
//! notify-debouncer-full = "0.3.1"
|
||||
//! ```
|
||||
//!
|
||||
//! In case you want to select specific features of notify,
|
||||
//! specify notify as dependency explicitly in your dependencies.
|
||||
//! Otherwise you can just use the re-export of notify from debouncer-full.
|
||||
//!
|
||||
//! ```toml
|
||||
//! notify-debouncer-full = "0.3.1"
|
||||
//! notify = { version = "..", features = [".."] }
|
||||
//! ```
|
||||
//!
|
||||
//! # Examples
|
||||
//!
|
||||
//! ```rust,no_run
|
||||
//! # use std::path::Path;
|
||||
//! # use std::time::Duration;
|
||||
//! use notify_debouncer_full::{notify::*, new_debouncer, DebounceEventResult};
|
||||
//!
|
||||
//! // Select recommended watcher for debouncer.
|
||||
//! // Using a callback here, could also be a channel.
|
||||
//! let mut debouncer = new_debouncer(Duration::from_secs(2), None, |result: DebounceEventResult| {
|
||||
//! match result {
|
||||
//! Ok(events) => events.iter().for_each(|event| println!("{event:?}")),
|
||||
//! Err(errors) => errors.iter().for_each(|error| println!("{error:?}")),
|
||||
//! }
|
||||
//! }).unwrap();
|
||||
//!
|
||||
//! // Add a path to be watched. All files and directories at that path and
|
||||
//! // below will be monitored for changes.
|
||||
//! debouncer.watcher().watch(Path::new("."), RecursiveMode::Recursive).unwrap();
|
||||
//!
|
||||
//! // Add the same path to the file ID cache. The cache uses unique file IDs
|
||||
//! // provided by the file system and is used to stich together rename events
|
||||
//! // in case the notification back-end doesn't emit rename cookies.
|
||||
//! debouncer.cache().add_root(Path::new("."), RecursiveMode::Recursive);
|
||||
//! ```
|
||||
//!
|
||||
//! # Features
|
||||
//!
|
||||
//! The following crate features can be turned on or off in your cargo dependency config:
|
||||
//!
|
||||
//! - `crossbeam` enabled by default, adds [`DebounceEventHandler`](DebounceEventHandler) support for crossbeam channels.
|
||||
//! Also enables crossbeam-channel in the re-exported notify. You may want to disable this when using the tokio async runtime.
|
||||
//! - `serde` enables serde support for events.
|
||||
//!
|
||||
//! # Caveats
|
||||
//!
|
||||
//! As all file events are sourced from notify, the [known problems](https://docs.rs/notify/latest/notify/#known-problems) section applies here too.
|
||||
|
||||
use std::{
|
||||
collections::{HashMap, VecDeque},
|
||||
path::PathBuf,
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use file_id::FileId;
|
||||
use notify::{
|
||||
event::{ModifyKind, RemoveKind, RenameMode},
|
||||
Error, ErrorKind, Event, EventKind, RecommendedWatcher, Watcher,
|
||||
};
|
||||
use parking_lot::Mutex;
|
||||
|
||||
use crate::{
|
||||
debouncer_cache::{FileIdCache, FileIdMap},
|
||||
debouncer_event::DebouncedEvent,
|
||||
};
|
||||
|
||||
/// The set of requirements for watcher debounce event handling functions.
|
||||
///
|
||||
/// # Example implementation
|
||||
///
|
||||
/// ```rust,no_run
|
||||
/// # use notify::{Event, Result, EventHandler};
|
||||
/// # use notify_debouncer_full::{DebounceEventHandler, DebounceEventResult};
|
||||
///
|
||||
/// /// Prints received events
|
||||
/// struct EventPrinter;
|
||||
///
|
||||
/// impl DebounceEventHandler for EventPrinter {
|
||||
/// fn handle_event(&mut self, result: DebounceEventResult) {
|
||||
/// match result {
|
||||
/// Ok(events) => events.iter().for_each(|event| println!("{event:?}")),
|
||||
/// Err(errors) => errors.iter().for_each(|error| println!("{error:?}")),
|
||||
/// }
|
||||
/// }
|
||||
/// }
|
||||
/// ```
|
||||
pub trait DebounceEventHandler: Send + 'static {
|
||||
/// Handles an event.
|
||||
fn handle_event(&mut self, event: DebounceEventResult);
|
||||
}
|
||||
|
||||
impl<F> DebounceEventHandler for F
|
||||
where
|
||||
F: FnMut(DebounceEventResult) + Send + 'static,
|
||||
{
|
||||
fn handle_event(&mut self, event: DebounceEventResult) {
|
||||
(self)(event);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "crossbeam")]
|
||||
impl DebounceEventHandler for crossbeam_channel::Sender<DebounceEventResult> {
|
||||
fn handle_event(&mut self, event: DebounceEventResult) {
|
||||
let _ = self.send(event);
|
||||
}
|
||||
}
|
||||
|
||||
impl DebounceEventHandler for std::sync::mpsc::Sender<DebounceEventResult> {
|
||||
fn handle_event(&mut self, event: DebounceEventResult) {
|
||||
let _ = self.send(event);
|
||||
}
|
||||
}
|
||||
|
||||
/// A result of debounced events.
|
||||
/// Comes with either a vec of events or vec of errors.
|
||||
pub type DebounceEventResult = Result<Vec<DebouncedEvent>, Vec<Error>>;
|
||||
|
||||
type DebounceData<T> = Arc<Mutex<DebounceDataInner<T>>>;
|
||||
|
||||
#[derive(Debug, Clone, Default, PartialEq, Eq)]
|
||||
struct Queue {
|
||||
/// Events must be stored in the following order:
|
||||
/// 1. `remove` or `move out` event
|
||||
/// 2. `rename` event
|
||||
/// 3. Other events
|
||||
events: VecDeque<DebouncedEvent>,
|
||||
}
|
||||
|
||||
impl Queue {
|
||||
fn was_created(&self) -> bool {
|
||||
self.events.front().map_or(false, |event| {
|
||||
matches!(
|
||||
event.kind,
|
||||
EventKind::Create(_) | EventKind::Modify(ModifyKind::Name(RenameMode::To))
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
fn was_removed(&self) -> bool {
|
||||
self.events.front().map_or(false, |event| {
|
||||
matches!(
|
||||
event.kind,
|
||||
EventKind::Remove(_) | EventKind::Modify(ModifyKind::Name(RenameMode::From))
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct DebounceDataInner<T> {
|
||||
queues: HashMap<PathBuf, Queue>,
|
||||
cache: T,
|
||||
rename_event: Option<(DebouncedEvent, Option<FileId>)>,
|
||||
rescan_event: Option<DebouncedEvent>,
|
||||
errors: Vec<Error>,
|
||||
timeout: Duration,
|
||||
}
|
||||
|
||||
impl<T: FileIdCache> DebounceDataInner<T> {
|
||||
pub(crate) fn new(cache: T, timeout: Duration) -> Self {
|
||||
Self {
|
||||
queues: HashMap::new(),
|
||||
cache,
|
||||
rename_event: None,
|
||||
rescan_event: None,
|
||||
errors: Vec::new(),
|
||||
timeout,
|
||||
}
|
||||
}
|
||||
|
||||
/// Retrieve a vec of debounced events, removing them if not continuous
|
||||
pub fn debounced_events(&mut self, flush_all: bool) -> Vec<DebouncedEvent> {
|
||||
let now = Instant::now();
|
||||
let mut events_expired = Vec::with_capacity(self.queues.len());
|
||||
let mut queues_remaining = HashMap::with_capacity(self.queues.len());
|
||||
|
||||
if let Some(event) = self.rescan_event.take() {
|
||||
if now.saturating_duration_since(event.time) >= self.timeout {
|
||||
events_expired.push(event);
|
||||
} else {
|
||||
self.rescan_event = Some(event);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: perfect fit for drain_filter https://github.com/rust-lang/rust/issues/59618
|
||||
for (path, mut queue) in self.queues.drain() {
|
||||
let mut kind_index = HashMap::new();
|
||||
|
||||
tracing::debug!("Checking path: {:?}", path);
|
||||
while let Some(event) = queue.events.pop_front() {
|
||||
if now.saturating_duration_since(event.time) >= self.timeout {
|
||||
// remove previous event of the same kind
|
||||
if let Some(idx) = kind_index.get(&event.kind).copied() {
|
||||
events_expired.remove(idx);
|
||||
|
||||
kind_index.values_mut().for_each(|i| {
|
||||
if *i > idx {
|
||||
*i -= 1
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
kind_index.insert(event.kind, events_expired.len());
|
||||
|
||||
events_expired.push(event);
|
||||
} else if flush_all {
|
||||
tracing::debug!("Flushing event! {:?}", event.event);
|
||||
events_expired.push(event);
|
||||
} else {
|
||||
queue.events.push_front(event);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if !queue.events.is_empty() {
|
||||
queues_remaining.insert(path, queue);
|
||||
}
|
||||
}
|
||||
|
||||
self.queues = queues_remaining;
|
||||
|
||||
// order events for different files chronologically, but keep the order of events for the same file
|
||||
events_expired.sort_by(|event_a, event_b| {
|
||||
// use the last path because rename events are emitted for the target path
|
||||
if event_a.paths.last() == event_b.paths.last() {
|
||||
std::cmp::Ordering::Equal
|
||||
} else {
|
||||
event_a.time.cmp(&event_b.time)
|
||||
}
|
||||
});
|
||||
|
||||
for event in &events_expired {
|
||||
tracing::debug!("Dispatching event: {:?}", event.event);
|
||||
}
|
||||
|
||||
events_expired
|
||||
}
|
||||
|
||||
/// Returns all currently stored errors
|
||||
pub fn errors(&mut self) -> Vec<Error> {
|
||||
let mut v = Vec::new();
|
||||
std::mem::swap(&mut v, &mut self.errors);
|
||||
v
|
||||
}
|
||||
|
||||
/// Add an error entry to re-send later on
|
||||
pub fn add_error(&mut self, error: Error) {
|
||||
self.errors.push(error);
|
||||
}
|
||||
|
||||
/// Add new event to debouncer cache
|
||||
pub fn add_event(&mut self, event: Event) {
|
||||
tracing::debug!("Received event: {:?}", event);
|
||||
|
||||
if event.need_rescan() {
|
||||
self.cache.rescan();
|
||||
self.rescan_event = Some(event.into());
|
||||
return;
|
||||
}
|
||||
|
||||
let path = &event.paths[0];
|
||||
|
||||
match &event.kind {
|
||||
EventKind::Create(_) => {
|
||||
self.cache.add_path(path);
|
||||
|
||||
self.push_event(event, Instant::now());
|
||||
}
|
||||
EventKind::Modify(ModifyKind::Name(rename_mode)) => {
|
||||
match rename_mode {
|
||||
RenameMode::Any => {
|
||||
if event.paths[0].exists() {
|
||||
self.handle_rename_to(event);
|
||||
} else {
|
||||
self.handle_rename_from(event);
|
||||
}
|
||||
}
|
||||
RenameMode::To => {
|
||||
self.handle_rename_to(event);
|
||||
}
|
||||
RenameMode::From => {
|
||||
self.handle_rename_from(event);
|
||||
}
|
||||
RenameMode::Both => {
|
||||
// ignore and handle `To` and `From` events instead
|
||||
}
|
||||
RenameMode::Other => {
|
||||
// unused
|
||||
}
|
||||
}
|
||||
}
|
||||
EventKind::Remove(_) => {
|
||||
self.push_remove_event(event, Instant::now());
|
||||
}
|
||||
EventKind::Other => {
|
||||
// ignore meta events
|
||||
}
|
||||
_ => {
|
||||
if self.cache.cached_file_id(path).is_none() {
|
||||
self.cache.add_path(path);
|
||||
}
|
||||
|
||||
self.push_event(event, Instant::now());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_rename_from(&mut self, event: Event) {
|
||||
let time = Instant::now();
|
||||
let path = &event.paths[0];
|
||||
|
||||
// store event
|
||||
let file_id = self.cache.cached_file_id(path).cloned();
|
||||
self.rename_event = Some((DebouncedEvent::new(event.clone(), time), file_id));
|
||||
|
||||
self.cache.remove_path(path);
|
||||
|
||||
self.push_event(event, time);
|
||||
}
|
||||
|
||||
fn handle_rename_to(&mut self, event: Event) {
|
||||
self.cache.add_path(&event.paths[0]);
|
||||
|
||||
let trackers_match = self
|
||||
.rename_event
|
||||
.as_ref()
|
||||
.and_then(|(e, _)| e.tracker())
|
||||
.and_then(|from_tracker| {
|
||||
event
|
||||
.attrs
|
||||
.tracker()
|
||||
.map(|to_tracker| from_tracker == to_tracker)
|
||||
})
|
||||
.unwrap_or_default();
|
||||
|
||||
let file_ids_match = self
|
||||
.rename_event
|
||||
.as_ref()
|
||||
.and_then(|(_, id)| id.as_ref())
|
||||
.and_then(|from_file_id| {
|
||||
self.cache
|
||||
.cached_file_id(&event.paths[0])
|
||||
.map(|to_file_id| from_file_id == to_file_id)
|
||||
})
|
||||
.unwrap_or_default();
|
||||
|
||||
if trackers_match || file_ids_match {
|
||||
// connect rename
|
||||
let (mut rename_event, _) = self.rename_event.take().unwrap(); // unwrap is safe because `rename_event` must be set at this point
|
||||
let path = rename_event.paths.remove(0);
|
||||
let time = rename_event.time;
|
||||
self.push_rename_event(path, event, time);
|
||||
} else {
|
||||
// move in
|
||||
self.push_event(event, Instant::now());
|
||||
}
|
||||
|
||||
self.rename_event = None;
|
||||
}
|
||||
|
||||
fn push_rename_event(&mut self, path: PathBuf, event: Event, time: Instant) {
|
||||
self.cache.remove_path(&path);
|
||||
|
||||
let mut source_queue = self.queues.remove(&path).unwrap_or_default();
|
||||
|
||||
// remove rename `from` event
|
||||
source_queue.events.pop_back();
|
||||
|
||||
// remove existing rename event
|
||||
let (remove_index, original_path, original_time) = source_queue
|
||||
.events
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find_map(|(index, e)| {
|
||||
if matches!(
|
||||
e.kind,
|
||||
EventKind::Modify(ModifyKind::Name(RenameMode::Both))
|
||||
) {
|
||||
Some((Some(index), e.paths[0].clone(), e.time))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.unwrap_or((None, path, time));
|
||||
|
||||
if let Some(remove_index) = remove_index {
|
||||
source_queue.events.remove(remove_index);
|
||||
}
|
||||
|
||||
// split off remove or move out event and add it back to the events map
|
||||
if source_queue.was_removed() {
|
||||
let event = source_queue.events.pop_front().unwrap();
|
||||
|
||||
self.queues.insert(
|
||||
event.paths[0].clone(),
|
||||
Queue {
|
||||
events: [event].into(),
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
// update paths
|
||||
for e in &mut source_queue.events {
|
||||
e.paths = vec![event.paths[0].clone()];
|
||||
}
|
||||
|
||||
// insert rename event at the front, unless the file was just created
|
||||
if !source_queue.was_created() {
|
||||
source_queue.events.push_front(DebouncedEvent {
|
||||
event: Event {
|
||||
kind: EventKind::Modify(ModifyKind::Name(RenameMode::Both)),
|
||||
paths: vec![original_path, event.paths[0].clone()],
|
||||
attrs: event.attrs,
|
||||
},
|
||||
time: original_time,
|
||||
});
|
||||
}
|
||||
|
||||
if let Some(target_queue) = self.queues.get_mut(&event.paths[0]) {
|
||||
if !target_queue.was_created() {
|
||||
let mut remove_event = DebouncedEvent {
|
||||
event: Event {
|
||||
kind: EventKind::Remove(RemoveKind::Any),
|
||||
paths: vec![event.paths[0].clone()],
|
||||
attrs: Default::default(),
|
||||
},
|
||||
time: original_time,
|
||||
};
|
||||
if !target_queue.was_removed() {
|
||||
remove_event.event = remove_event.event.set_info("override");
|
||||
}
|
||||
source_queue.events.push_front(remove_event);
|
||||
}
|
||||
*target_queue = source_queue;
|
||||
} else {
|
||||
self.queues.insert(event.paths[0].clone(), source_queue);
|
||||
}
|
||||
}
|
||||
|
||||
fn push_remove_event(&mut self, event: Event, time: Instant) {
|
||||
let path = &event.paths[0];
|
||||
|
||||
// remove child queues
|
||||
self.queues.retain(|p, _| !p.starts_with(path) || p == path);
|
||||
|
||||
// remove cached file ids
|
||||
self.cache.remove_path(path);
|
||||
|
||||
match self.queues.get_mut(path) {
|
||||
Some(queue) if queue.was_created() => {
|
||||
self.queues.remove(path);
|
||||
}
|
||||
Some(queue) => {
|
||||
queue.events = [DebouncedEvent::new(event, time)].into();
|
||||
}
|
||||
None => {
|
||||
self.push_event(event, time);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn push_event(&mut self, event: Event, time: Instant) {
|
||||
let path = &event.paths[0];
|
||||
|
||||
if let Some(queue) = self.queues.get_mut(path) {
|
||||
// skip duplicate create events and modifications right after creation
|
||||
if match event.kind {
|
||||
EventKind::Modify(ModifyKind::Data(_) | ModifyKind::Metadata(_))
|
||||
| EventKind::Create(_) => !queue.was_created(),
|
||||
_ => true,
|
||||
} {
|
||||
queue.events.push_back(DebouncedEvent::new(event, time));
|
||||
}
|
||||
} else {
|
||||
self.queues.insert(
|
||||
path.to_path_buf(),
|
||||
Queue {
|
||||
events: [DebouncedEvent::new(event, time)].into(),
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Debouncer guard, stops the debouncer on drop.
|
||||
#[derive(Debug)]
|
||||
pub struct Debouncer<T: Watcher, C: FileIdCache> {
|
||||
watcher: T,
|
||||
debouncer_thread: Option<std::thread::JoinHandle<()>>,
|
||||
#[allow(dead_code)]
|
||||
data: DebounceData<C>,
|
||||
stop: Arc<AtomicBool>,
|
||||
flush: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl<T: Watcher, C: FileIdCache> Debouncer<T, C> {
|
||||
/// Stop the debouncer, waits for the event thread to finish.
|
||||
/// May block for the duration of one tick_rate.
|
||||
#[allow(dead_code)]
|
||||
pub fn stop(mut self) {
|
||||
self.set_stop();
|
||||
if let Some(t) = self.debouncer_thread.take() {
|
||||
let _ = t.join();
|
||||
}
|
||||
}
|
||||
|
||||
/// Stop the debouncer, does not wait for the event thread to finish.
|
||||
#[allow(dead_code)]
|
||||
pub fn stop_nonblocking(self) {
|
||||
self.set_stop();
|
||||
}
|
||||
|
||||
fn set_stop(&self) {
|
||||
self.stop.store(true, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Indicates that on the next tick of the debouncer thread, all events should be emitted.
|
||||
pub fn flush_nonblocking(&self) {
|
||||
self.flush.store(true, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Access to the internally used notify Watcher backend
|
||||
pub fn watcher(&mut self) -> &mut T {
|
||||
&mut self.watcher
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Watcher, C: FileIdCache> Drop for Debouncer<T, C> {
|
||||
fn drop(&mut self) {
|
||||
self.set_stop();
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new debounced watcher with custom configuration.
|
||||
///
|
||||
/// Timeout is the amount of time after which a debounced event is emitted.
|
||||
///
|
||||
/// If tick_rate is None, notify will select a tick rate that is 1/4 of the provided timeout.
|
||||
pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher, C: FileIdCache + Send + 'static>(
|
||||
timeout: Duration,
|
||||
tick_rate: Option<Duration>,
|
||||
flush_after: Option<u32>,
|
||||
mut event_handler: F,
|
||||
file_id_cache: C,
|
||||
config: notify::Config,
|
||||
) -> Result<Debouncer<T, C>, Error> {
|
||||
let data = Arc::new(Mutex::new(DebounceDataInner::new(file_id_cache, timeout)));
|
||||
let stop = Arc::new(AtomicBool::new(false));
|
||||
let flush = Arc::new(AtomicBool::new(false));
|
||||
|
||||
let tick_div = 4;
|
||||
let tick = match tick_rate {
|
||||
Some(v) => {
|
||||
if v > timeout {
|
||||
return Err(Error::new(ErrorKind::Generic(format!(
|
||||
"Invalid tick_rate, tick rate {:?} > {:?} timeout!",
|
||||
v, timeout
|
||||
))));
|
||||
}
|
||||
v
|
||||
}
|
||||
None => timeout.checked_div(tick_div).ok_or_else(|| {
|
||||
Error::new(ErrorKind::Generic(format!(
|
||||
"Failed to calculate tick as {:?}/{}!",
|
||||
timeout, tick_div
|
||||
)))
|
||||
})?,
|
||||
};
|
||||
|
||||
let data_c = data.clone();
|
||||
let stop_c = stop.clone();
|
||||
let flush_c = flush.clone();
|
||||
let mut idle_count = 0;
|
||||
let mut prev_queue_count = 0;
|
||||
let thread = std::thread::Builder::new()
|
||||
.name("notify-rs debouncer loop".to_string())
|
||||
.spawn(move || loop {
|
||||
if stop_c.load(Ordering::Acquire) {
|
||||
break;
|
||||
}
|
||||
|
||||
let mut should_flush = flush_c.load(Ordering::Acquire);
|
||||
|
||||
std::thread::sleep(tick);
|
||||
|
||||
let send_data;
|
||||
let errors;
|
||||
{
|
||||
let mut lock = data_c.lock();
|
||||
|
||||
let queue_count = lock.queues.values().fold(0, |acc, x| acc + x.events.len());
|
||||
if prev_queue_count == queue_count {
|
||||
idle_count += 1;
|
||||
} else {
|
||||
prev_queue_count = queue_count
|
||||
}
|
||||
|
||||
if let Some(threshold) = flush_after {
|
||||
if idle_count >= threshold {
|
||||
idle_count = 0;
|
||||
prev_queue_count = 0;
|
||||
should_flush = true;
|
||||
}
|
||||
}
|
||||
|
||||
send_data = lock.debounced_events(should_flush);
|
||||
if should_flush {
|
||||
flush_c.store(false, Ordering::Release);
|
||||
}
|
||||
|
||||
errors = lock.errors();
|
||||
}
|
||||
if !send_data.is_empty() {
|
||||
if should_flush {
|
||||
tracing::debug!("Flushed {} events", send_data.len());
|
||||
}
|
||||
|
||||
event_handler.handle_event(Ok(send_data));
|
||||
}
|
||||
if !errors.is_empty() {
|
||||
event_handler.handle_event(Err(errors));
|
||||
}
|
||||
})?;
|
||||
|
||||
let data_c = data.clone();
|
||||
let watcher = T::new(
|
||||
move |e: Result<Event, Error>| {
|
||||
let mut lock = data_c.lock();
|
||||
|
||||
match e {
|
||||
Ok(e) => lock.add_event(e),
|
||||
// can't have multiple TX, so we need to pipe that through our debouncer
|
||||
Err(e) => lock.add_error(e),
|
||||
}
|
||||
},
|
||||
config,
|
||||
)?;
|
||||
|
||||
let guard = Debouncer {
|
||||
watcher,
|
||||
debouncer_thread: Some(thread),
|
||||
data,
|
||||
stop,
|
||||
flush,
|
||||
};
|
||||
|
||||
Ok(guard)
|
||||
}
|
||||
|
||||
/// Short function to create a new debounced watcher with the recommended debouncer and the built-in file ID cache.
|
||||
///
|
||||
/// Timeout is the amount of time after which a debounced event is emitted.
|
||||
///
|
||||
/// If tick_rate is None, notify will select a tick rate that is 1/4 of the provided timeout.
|
||||
pub fn new_debouncer<F: DebounceEventHandler>(
|
||||
timeout: Duration,
|
||||
tick_rate: Option<Duration>,
|
||||
flush_after: Option<u32>,
|
||||
event_handler: F,
|
||||
) -> Result<Debouncer<RecommendedWatcher, FileIdMap>, Error> {
|
||||
new_debouncer_opt::<F, RecommendedWatcher, FileIdMap>(
|
||||
timeout,
|
||||
tick_rate,
|
||||
flush_after,
|
||||
event_handler,
|
||||
FileIdMap::new(),
|
||||
notify::Config::default(),
|
||||
)
|
||||
}
|
142
crates/gitbutler-watcher/src/debouncer_cache.rs
Normal file
142
crates/gitbutler-watcher/src/debouncer_cache.rs
Normal file
@ -0,0 +1,142 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
|
||||
use file_id::{get_file_id, FileId};
|
||||
use notify::RecursiveMode;
|
||||
use walkdir::WalkDir;
|
||||
|
||||
/// The interface of a file ID cache.
|
||||
///
|
||||
/// This trait can be implemented for an existing cache, if it already holds `FileId`s.
|
||||
pub trait FileIdCache {
|
||||
/// Get a `FileId` from the cache for a given `path`.
|
||||
///
|
||||
/// If the path is not cached, `None` should be returned and there should not be any attempt to read the file ID from disk.
|
||||
fn cached_file_id(&self, path: &Path) -> Option<&FileId>;
|
||||
|
||||
/// Add a new path to the cache or update its value.
|
||||
///
|
||||
/// This will be called if a new file or directory is created or if an existing file is overridden.
|
||||
fn add_path(&mut self, path: &Path);
|
||||
|
||||
/// Remove a path from the cache.
|
||||
///
|
||||
/// This will be called if a file or directory is deleted.
|
||||
fn remove_path(&mut self, path: &Path);
|
||||
|
||||
/// Re-scan all paths.
|
||||
///
|
||||
/// This will be called if the notification back-end has dropped events.
|
||||
fn rescan(&mut self);
|
||||
}
|
||||
|
||||
/// A cache to hold the file system IDs of all watched files.
|
||||
///
|
||||
/// The file ID cache uses unique file IDs provided by the file system and is used to stich together
|
||||
/// rename events in case the notification back-end doesn't emit rename cookies.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct FileIdMap {
|
||||
paths: HashMap<PathBuf, FileId>,
|
||||
roots: Vec<(PathBuf, RecursiveMode)>,
|
||||
}
|
||||
|
||||
impl FileIdMap {
|
||||
/// Construct an empty cache.
|
||||
pub fn new() -> Self {
|
||||
Default::default()
|
||||
}
|
||||
|
||||
/// Add a path to the cache.
|
||||
///
|
||||
/// If `recursive_mode` is `Recursive`, all children will be added to the cache as well
|
||||
/// and all paths will be kept up-to-date in case of changes like new files being added,
|
||||
/// files being removed or renamed.
|
||||
#[allow(dead_code)]
|
||||
pub fn add_root(&mut self, path: impl Into<PathBuf>, recursive_mode: RecursiveMode) {
|
||||
let path = path.into();
|
||||
|
||||
self.roots.push((path.clone(), recursive_mode));
|
||||
|
||||
self.add_path(&path);
|
||||
}
|
||||
|
||||
/// Remove a path form the cache.
|
||||
///
|
||||
/// If the path was added with `Recursive` mode, all children will also be removed from the cache.
|
||||
#[allow(dead_code)]
|
||||
pub fn remove_root(&mut self, path: impl AsRef<Path>) {
|
||||
self.roots.retain(|(root, _)| !root.starts_with(&path));
|
||||
|
||||
self.remove_path(path.as_ref());
|
||||
}
|
||||
|
||||
fn dir_scan_depth(is_recursive: bool) -> usize {
|
||||
if is_recursive {
|
||||
usize::max_value()
|
||||
} else {
|
||||
1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FileIdCache for FileIdMap {
|
||||
fn cached_file_id(&self, path: &Path) -> Option<&FileId> {
|
||||
self.paths.get(path)
|
||||
}
|
||||
|
||||
fn add_path(&mut self, path: &Path) {
|
||||
let is_recursive = self
|
||||
.roots
|
||||
.iter()
|
||||
.find_map(|(root, recursive_mode)| {
|
||||
if path.starts_with(root) {
|
||||
Some(*recursive_mode == RecursiveMode::Recursive)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.unwrap_or_default();
|
||||
|
||||
for (path, file_id) in WalkDir::new(path)
|
||||
.follow_links(true)
|
||||
.max_depth(Self::dir_scan_depth(is_recursive))
|
||||
.into_iter()
|
||||
.filter_map(|entry| {
|
||||
let path = entry.ok()?.into_path();
|
||||
let file_id = get_file_id(&path).ok()?;
|
||||
Some((path, file_id))
|
||||
})
|
||||
{
|
||||
self.paths.insert(path, file_id);
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_path(&mut self, path: &Path) {
|
||||
self.paths.retain(|p, _| !p.starts_with(path));
|
||||
}
|
||||
|
||||
fn rescan(&mut self) {
|
||||
for (root, _) in self.roots.clone() {
|
||||
self.add_path(&root);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// An implementation of the `FileIdCache` trait that doesn't hold any data.
|
||||
///
|
||||
/// This pseudo cache can be used to disable the file tracking using file system IDs.
|
||||
pub struct NoCache;
|
||||
|
||||
impl FileIdCache for NoCache {
|
||||
fn cached_file_id(&self, _path: &Path) -> Option<&FileId> {
|
||||
None
|
||||
}
|
||||
|
||||
fn add_path(&mut self, _path: &Path) {}
|
||||
|
||||
fn remove_path(&mut self, _path: &Path) {}
|
||||
|
||||
fn rescan(&mut self) {}
|
||||
}
|
53
crates/gitbutler-watcher/src/debouncer_event.rs
Normal file
53
crates/gitbutler-watcher/src/debouncer_event.rs
Normal file
@ -0,0 +1,53 @@
|
||||
use std::ops::{Deref, DerefMut};
|
||||
|
||||
use std::time::Instant;
|
||||
|
||||
use notify::Event;
|
||||
|
||||
/// A debounced event is emitted after a short delay.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct DebouncedEvent {
|
||||
/// The original event.
|
||||
pub event: Event,
|
||||
|
||||
/// The time at which the event occurred.
|
||||
pub time: Instant,
|
||||
}
|
||||
|
||||
impl DebouncedEvent {
|
||||
pub fn new(event: Event, time: Instant) -> Self {
|
||||
Self { event, time }
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for DebouncedEvent {
|
||||
type Target = Event;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.event
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for DebouncedEvent {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.event
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for DebouncedEvent {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
event: Default::default(),
|
||||
time: Instant::now(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Event> for DebouncedEvent {
|
||||
fn from(event: Event) -> Self {
|
||||
Self {
|
||||
event,
|
||||
time: Instant::now(),
|
||||
}
|
||||
}
|
||||
}
|
@ -1,19 +1,31 @@
|
||||
use std::collections::HashSet;
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
use std::{collections::HashSet, sync::Arc};
|
||||
|
||||
use crate::events::InternalEvent;
|
||||
use crate::debouncer::Debouncer;
|
||||
use crate::debouncer_cache::FileIdMap;
|
||||
use crate::{debouncer::new_debouncer, events::InternalEvent};
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use gitbutler_core::ops::OPLOG_FILE_NAME;
|
||||
use gitbutler_core::{git, projects::ProjectId};
|
||||
use notify::Watcher;
|
||||
use notify_debouncer_full::new_debouncer;
|
||||
use notify::{RecommendedWatcher, Watcher};
|
||||
use tokio::task;
|
||||
use tracing::Level;
|
||||
|
||||
use gitbutler_core::ops::OPLOG_FILE_NAME;
|
||||
/// The timeout for debouncing file change events.
|
||||
/// This is used to prevent multiple events from being sent for a single file change.
|
||||
const DEBOUNCE_TIMEOUT: Duration = Duration::from_millis(100);
|
||||
/// We will collect notifications for up to this amount of time at a very
|
||||
/// maximum before releasing them. This duration will be hit if e.g. a build
|
||||
/// is constantly running and producing a lot of file changes, we will process
|
||||
/// them even if the build is still running.
|
||||
const DEBOUNCE_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
// The internal rate at which the debouncer will update its state.
|
||||
const TICK_RATE: Duration = Duration::from_millis(250);
|
||||
|
||||
// The number of TICK_RATE intervals required of "dead air" (i.e. no new events
|
||||
// arriving) before we will automatically flush pending events. This means that
|
||||
// after the disk is quiet for TICK_RATE * FLUSH_AFTER_EMPTY, we will process
|
||||
// the pending events, even if DEBOUNCE_TIMEOUT hasn't expired yet
|
||||
const FLUSH_AFTER_EMPTY: u32 = 3;
|
||||
|
||||
/// This error is required only because `anyhow::Error` isn't implementing `std::error::Error`, and [`spawn()`]
|
||||
/// needs to wrap it into a `backoff::Error` which also has to implement the `Error` trait.
|
||||
@ -43,10 +55,17 @@ pub fn spawn(
|
||||
project_id: ProjectId,
|
||||
worktree_path: &std::path::Path,
|
||||
out: tokio::sync::mpsc::UnboundedSender<InternalEvent>,
|
||||
) -> Result<()> {
|
||||
) -> Result<Arc<Debouncer<RecommendedWatcher, FileIdMap>>> {
|
||||
let (notify_tx, notify_rx) = std::sync::mpsc::channel();
|
||||
let mut debouncer =
|
||||
new_debouncer(DEBOUNCE_TIMEOUT, None, notify_tx).context("failed to create debouncer")?;
|
||||
let mut debouncer = Arc::new(
|
||||
new_debouncer(
|
||||
DEBOUNCE_TIMEOUT,
|
||||
Some(TICK_RATE),
|
||||
Some(FLUSH_AFTER_EMPTY),
|
||||
notify_tx,
|
||||
)
|
||||
.context("failed to create debouncer")?,
|
||||
);
|
||||
|
||||
let policy = backoff::ExponentialBackoffBuilder::new()
|
||||
.with_max_elapsed_time(Some(std::time::Duration::from_secs(30)))
|
||||
@ -54,7 +73,8 @@ pub fn spawn(
|
||||
|
||||
// Start the watcher, but retry if there are transient errors.
|
||||
backoff::retry(policy, || {
|
||||
debouncer
|
||||
Arc::get_mut(&mut debouncer)
|
||||
.expect("")
|
||||
.watcher()
|
||||
.watch(worktree_path, notify::RecursiveMode::Recursive)
|
||||
.map_err(|err| match err.kind {
|
||||
@ -69,11 +89,14 @@ pub fn spawn(
|
||||
})
|
||||
.context("failed to start watcher")?;
|
||||
|
||||
let ret = Ok(debouncer.clone());
|
||||
|
||||
let worktree_path = worktree_path.to_owned();
|
||||
task::spawn_blocking(move || {
|
||||
tracing::debug!(%project_id, "file watcher started");
|
||||
let _debouncer = debouncer;
|
||||
let _runtime = tracing::span!(Level::INFO, "file monitor", %project_id ).entered();
|
||||
let _debouncer = debouncer.clone();
|
||||
|
||||
'outer: for result in notify_rx {
|
||||
let stats = tracing::span!(
|
||||
Level::INFO,
|
||||
@ -181,7 +204,8 @@ pub fn spawn(
|
||||
}
|
||||
}
|
||||
});
|
||||
Ok(())
|
||||
|
||||
ret
|
||||
}
|
||||
|
||||
#[cfg(target_family = "unix")]
|
||||
|
@ -7,8 +7,12 @@ mod events;
|
||||
use events::InternalEvent;
|
||||
pub use events::{Action, Change};
|
||||
|
||||
mod debouncer;
|
||||
mod debouncer_cache;
|
||||
mod debouncer_event;
|
||||
mod file_monitor;
|
||||
mod handler;
|
||||
|
||||
pub use handler::Handler;
|
||||
|
||||
use std::path::Path;
|
||||
@ -27,6 +31,7 @@ pub struct WatcherHandle {
|
||||
tx: UnboundedSender<InternalEvent>,
|
||||
/// The id of the project we are watching.
|
||||
project_id: ProjectId,
|
||||
signal_flush: UnboundedSender<()>,
|
||||
/// A way to tell the background process to stop handling events.
|
||||
cancellation_token: CancellationToken,
|
||||
}
|
||||
@ -50,6 +55,11 @@ impl WatcherHandle {
|
||||
pub fn project_id(&self) -> ProjectId {
|
||||
self.project_id
|
||||
}
|
||||
|
||||
pub fn flush(&self) -> Result<()> {
|
||||
self.signal_flush.send(())?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Run our file watcher processing loop in the background and let `handler` deal with them.
|
||||
@ -73,13 +83,15 @@ pub fn watch_in_background(
|
||||
project_id: ProjectId,
|
||||
) -> Result<WatcherHandle, anyhow::Error> {
|
||||
let (events_out, mut events_in) = unbounded_channel();
|
||||
let (flush_tx, mut flush_rx) = unbounded_channel();
|
||||
|
||||
file_monitor::spawn(project_id, path.as_ref(), events_out.clone())?;
|
||||
let debounce = file_monitor::spawn(project_id, path.as_ref(), events_out.clone())?;
|
||||
|
||||
let cancellation_token = CancellationToken::new();
|
||||
let handle = WatcherHandle {
|
||||
tx: events_out,
|
||||
project_id,
|
||||
signal_flush: flush_tx,
|
||||
cancellation_token: cancellation_token.clone(),
|
||||
};
|
||||
let handle_event = move |event: InternalEvent| -> Result<()> {
|
||||
@ -100,6 +112,9 @@ pub fn watch_in_background(
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(event) = events_in.recv() => handle_event(event)?,
|
||||
Some(_signal_flush) = flush_rx.recv() => {
|
||||
debounce.flush_nonblocking();
|
||||
}
|
||||
() = cancellation_token.cancelled() => {
|
||||
break;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user