add more listeners

This commit is contained in:
Nikita Galaiko 2023-04-14 12:25:07 +02:00
parent 6988aef8e6
commit 6f4336ff4c
17 changed files with 528 additions and 271 deletions

View File

@ -1,25 +1,24 @@
use std::{
path::PathBuf,
sync::{self, Arc, Mutex},
sync::{Arc, Mutex},
};
use crate::projects;
use anyhow::Result;
use notify::{Config, RecommendedWatcher, Watcher};
#[derive(Debug, Clone)]
pub struct Dispatcher {
watcher: Arc<Mutex<Option<RecommendedWatcher>>>,
project_path: String,
project_path: PathBuf,
project_id: String,
}
impl Dispatcher {
pub fn new(project: &projects::Project) -> Self {
pub fn new<P: AsRef<std::path::Path>>(project_id: String, path: P) -> Self {
Self {
watcher: Arc::new(Mutex::new(None)),
project_path: project.path.clone(),
project_id: project.id.clone(),
project_path: path.as_ref().to_path_buf(),
project_id,
}
}

View File

@ -1,8 +1,8 @@
use crate::projects;
use anyhow::{Result, Context};
use crossbeam_channel::{bounded, select, tick, Receiver, Sender};
use std::time;
use anyhow::Result;
use crossbeam_channel::{bounded, select, tick, Receiver, Sender};
#[derive(Debug, Clone)]
pub struct Dispatcher {
project_id: String,
@ -10,9 +10,9 @@ pub struct Dispatcher {
}
impl Dispatcher {
pub fn new(project: &projects::Project) -> Self {
pub fn new(project_id: String) -> Self {
Self {
project_id: project.id.clone(),
project_id,
stop: bounded(1),
}
}
@ -22,19 +22,21 @@ impl Dispatcher {
Ok(())
}
pub fn start(&self, interval: time::Duration, rtx: Sender<time::Instant>) -> Result<()> {
log::info!("{}: ticker started", self.project_id);
pub fn start(&self, interval: time::Duration, rtx: Sender<time::SystemTime>) -> Result<()> {
let update = tick(interval);
log::info!("{}: ticker started", self.project_id);
loop {
select! {
recv(update) -> ts => {
let ts = ts.context("failed to receive tick event")?;
if let Err(e) = rtx.send(ts) {
log::error!("{}: failed to send tick event: {:#}", self.project_id, e);
}
}
recv(update) -> ts => match ts {
Ok(_) => {
if let Err(e) = rtx.send(time::SystemTime::now()) {
log::error!("{}: failed to send tick event: {:#}", self.project_id, e);
}
},
Err(e) => log::error!("{}: failed to receive tick event: {:#}", self.project_id, e)
},
recv(self.stop.1) -> _ => {
break;
}

View File

@ -1,19 +1,20 @@
use super::{reader, writer};
use crate::{projects, sessions};
use anyhow::{anyhow, Context, Ok, Result};
use crate::sessions;
use super::{reader, writer};
pub struct Repository {
pub(crate) project_id: String,
git_repository: git2::Repository,
}
impl Repository {
pub fn open<P: AsRef<std::path::Path>>(root: P, project: &projects::Project) -> Result<Self> {
let root = root.as_ref();
let path = root.join(&project.id);
pub fn open<P: AsRef<std::path::Path>>(root: P, project_id: String) -> Result<Self> {
let path = root.as_ref().join(project_id.clone());
let git_repository = if path.exists() {
git2::Repository::open(path)
.with_context(|| format!("{}: failed to open git repository", project.path))?
git2::Repository::open(path.clone())
.with_context(|| format!("{}: failed to open git repository", path.display()))?
} else {
// TODO: flush first session instead
@ -23,7 +24,7 @@ impl Repository {
.initial_head("refs/heads/current")
.external_template(false),
)
.with_context(|| format!("{}: failed to initialize git repository", project.path))?;
.with_context(|| format!("{}: failed to initialize git repository", path.display()))?;
let mut index = git_repository.index()?;
let oid = index.write_tree()?;
@ -40,7 +41,7 @@ impl Repository {
git_repository
};
Ok(Self {
project_id: project.id.clone(),
project_id,
git_repository,
})
}

View File

@ -0,0 +1,46 @@
use std::time;
use anyhow::{Context, Result};
use crate::app::gb_repository;
pub struct Listener<'listener> {
gb_repository: &'listener gb_repository::Repository,
}
const FIVE_MINUTES: u128 = time::Duration::new(5 * 60, 0).as_millis();
const ONE_HOUR: u128 = time::Duration::new(60 * 60, 0).as_millis();
impl<'listener> Listener<'listener> {
pub fn new(gb_repository: &'listener gb_repository::Repository) -> Self {
Self { gb_repository }
}
pub fn register(&self, ts: time::SystemTime) -> Result<()> {
let current_session = self
.gb_repository
.get_current_session()
.context("failed to get current session")?;
if current_session.is_none() {
return Ok(());
}
let current_session = current_session.unwrap();
let now = ts
.duration_since(time::SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis();
let elapsed_last = now - current_session.meta.last_timestamp_ms;
if elapsed_last < FIVE_MINUTES {
return Ok(());
}
let elapsed_start = now - current_session.meta.start_timestamp_ms;
if elapsed_start < ONE_HOUR {
return Ok(());
}
Ok(())
}
}

View File

@ -1,156 +1,57 @@
use crate::{
app::{
gb_repository, project_repository,
reader::{self, Reader},
writer::Writer,
},
deltas, projects,
};
use std::sync;
use anyhow::{Context, Result};
use crate::{app::gb_repository, events, projects};
use super::{git_file_change, project_file_change};
pub struct Listener<'listener> {
project_id: String,
project_repository: &'listener project_repository::Repository,
gb_repository: &'listener gb_repository::Repository,
project_file_change_listener: project_file_change::Listener<'listener>,
git_file_change_listener: git_file_change::Listener,
}
impl<'listener> Listener<'listener> {
pub fn new(
project: &projects::Project,
project_repository: &'listener project_repository::Repository,
project_id: String,
project_store: projects::Storage,
gb_repository: &'listener gb_repository::Repository,
events: sync::mpsc::Sender<events::Event>,
) -> Self {
Self {
project_id: project.id.clone(),
project_repository,
gb_repository,
}
}
fn get_current_file_content(&self, path: &std::path::Path) -> Result<Option<String>> {
if self.project_repository.is_path_ignored(path)? {
return Ok(None);
}
let reader = self.project_repository.get_wd_reader();
let path = path.to_str().unwrap();
if reader.size(path)? > 100_000 {
log::warn!("{}: ignoring large file: {}", self.project_id, path);
return Ok(None);
}
match reader.read(path)? {
reader::Content::UTF8(content) => Ok(Some(content)),
reader::Content::Binary(_) => {
log::warn!("{}: ignoring non-utf8 file: {}", self.project_id, path);
return Ok(None);
}
}
}
fn get_latest_file_contents(&self, path: &std::path::Path) -> Result<Option<String>> {
let path = path.to_str().unwrap();
let gb_head_reader = self
.gb_repository
.get_head_reader()
.with_context(|| "failed to get gb head reader")?;
let project_head_reader = self
.project_repository
.get_head_reader()
.with_context(|| "failed to get project head reader")?;
let reader = if gb_head_reader.exists(path) {
gb_head_reader
} else if project_head_reader.exists(path) {
project_head_reader
} else {
return Ok(None);
};
if reader.size(path)? > 100_000 {
log::warn!("{}: ignoring large file: {}", self.project_id, path);
return Ok(None);
}
match reader.read(path)? {
reader::Content::UTF8(content) => Ok(Some(content)),
reader::Content::Binary(_) => {
log::warn!("{}: ignoring non-utf8 file: {}", self.project_id, path);
return Ok(None);
}
}
}
fn get_current_deltas(&self, path: &std::path::Path) -> Result<Option<Vec<deltas::Delta>>> {
let reader = self.gb_repository.get_wd_reader();
let deltas_path = self.gb_repository.deltas_path().join(path);
match reader.read_to_string(deltas_path.to_str().unwrap()) {
Ok(content) => Ok(Some(serde_json::from_str(&content)?)),
Err(reader::Error::NotFound) => Ok(None),
Err(err) => Err(err.into()),
project_file_change_listener: project_file_change::Listener::new(
project_id.clone(),
project_store.clone(),
gb_repository,
),
git_file_change_listener: git_file_change::Listener::new(
project_id,
project_store,
events,
),
}
}
pub fn register<P: AsRef<std::path::Path>>(&self, path: P) -> Result<()> {
let path = path.as_ref();
let current_file_content = match self
.get_current_file_content(&path)
.with_context(|| "failed to get current file content")?
{
Some(content) => content,
None => return Ok(()),
};
let latest_file_content = self
.get_latest_file_contents(&path)
.with_context(|| "failed to get latest file content")?;
let current_deltas = self
.get_current_deltas(&path)
.with_context(|| "failed to get current deltas")?;
let mut text_doc = match (latest_file_content, current_deltas) {
(Some(latest_contents), Some(deltas)) => {
deltas::TextDocument::new(Some(&latest_contents), deltas)?
}
(Some(latest_contents), None) => {
deltas::TextDocument::new(Some(&latest_contents), vec![])?
}
(None, Some(deltas)) => deltas::TextDocument::new(None, deltas)?,
(None, None) => deltas::TextDocument::new(None, vec![])?,
};
if !text_doc.update(&current_file_content)? {
log::debug!(
"{}: {} no new deltas, ignoring",
self.project_id,
path.display()
);
return Ok(());
if !path.starts_with(".git") {
self.project_file_change_listener
.register(&path)
.with_context(|| {
format!(
"failed to register project file change for path: {}",
path.display()
)
})
} else {
self.git_file_change_listener
.register(path)
.with_context(|| {
format!(
"failed to register git file change for path: {}",
path.display()
)
})
}
log::info!("{}: {} changed", self.project_id, path.display());
let writer = self.gb_repository.get_wd_writer();
// save current deltas
writer
.write_string(
self.gb_repository
.deltas_path()
.join(path)
.to_str()
.unwrap(),
&serde_json::to_string(&text_doc.get_deltas())?,
)
.with_context(|| "failed to write deltas")?;
// save file contents corresponding to the deltas
writer
.write_string(
self.gb_repository.wd_path().join(path).to_str().unwrap(),
&current_file_content,
)
.with_context(|| "failed to write file content")?;
Ok(())
}
}

View File

@ -0,0 +1,53 @@
use std::{sync, time};
use anyhow::{anyhow, Context, Result};
use crate::{app::gb_repository, events, projects, sessions, users};
pub struct Listener<'listener> {
project_id: String,
user_store: &'listener users::Storage,
gb_repository: &'listener gb_repository::Repository,
sender: sync::mpsc::Sender<events::Event>,
}
impl<'listener> Listener<'listener> {
pub fn new(
project: &projects::Project,
user_store: &'listener users::Storage,
gb_repository: &'listener gb_repository::Repository,
sender: sync::mpsc::Sender<events::Event>,
) -> Self {
Self {
project_id: project.id.clone(),
user_store,
gb_repository,
sender,
}
}
pub fn register(&self, session: &sessions::Session) -> Result<()> {
let session = sessions::Session {
id: session.id.clone(),
hash: session.hash.clone(),
activity: session.activity.clone(),
meta: sessions::Meta {
last_timestamp_ms: time::SystemTime::now()
.duration_since(time::SystemTime::UNIX_EPOCH)?
.as_millis(),
..session.meta.clone()
},
};
let user = self.user_store.get().context("failed to get user")?;
self.flush(user, &session)
.context("failed to flush session")?;
Ok(())
}
fn flush(&self, user: Option<users::User>, session: &sessions::Session) -> Result<()> {
Err(anyhow!("not implemented"))
}
}

View File

@ -0,0 +1,67 @@
use std::sync;
use anyhow::{Context, Result};
use crate::{app::project_repository, events, projects};
pub struct Listener {
project_id: String,
project_store: projects::Storage,
events: sync::mpsc::Sender<events::Event>,
}
impl Listener {
pub fn new(
project_id: String,
project_store: projects::Storage,
events: sync::mpsc::Sender<events::Event>,
) -> Self {
Self {
project_id,
project_store,
events,
}
}
pub fn register<P: AsRef<std::path::Path>>(&self, path: P) -> Result<()> {
let project = self
.project_store
.get_project(&self.project_id)
.with_context(|| "failed to get project")?;
if project.is_none() {
return Err(anyhow::anyhow!("project not found"));
}
let project = project.unwrap();
let project_repository = project_repository::Repository::open(&project)
.with_context(|| "failed to open project repository for project")?;
let path = path.as_ref().to_str().unwrap();
let event = if path.eq(".git/logs/HEAD") {
log::info!("{}: git activity", project.id);
Some(events::Event::git_activity(&project))
} else if path.eq(".git/HEAD") {
log::info!("{}: git head changed", project.id);
let head_ref = project_repository.head()?;
if let Some(head) = head_ref.name() {
Some(events::Event::git_head(&project, &head))
} else {
None
}
} else if path.eq(".git/index") {
log::info!("{}: git index changed", project.id);
Some(events::Event::git_index(&project))
} else {
None
};
if let Some(event) = event {
self.events
.send(event)
.with_context(|| "failed to send git event")?;
}
Ok(())
}
}

View File

@ -1,4 +1,8 @@
pub mod file_change;
mod git_file_change;
mod project_file_change;
pub mod check_current_session;
mod flush_session;
#[cfg(test)]
mod file_change_tests;
mod project_file_change_tests;

View File

@ -0,0 +1,177 @@
use anyhow::{Context, Result};
use crate::{
app::{
gb_repository, project_repository,
reader::{self, Reader},
writer::Writer,
},
deltas, projects,
};
pub struct Listener<'listener> {
project_id: String,
project_store: projects::Storage,
gb_repository: &'listener gb_repository::Repository,
}
impl<'listener> Listener<'listener> {
pub fn new(
project_id: String,
project_store: projects::Storage,
gb_repository: &'listener gb_repository::Repository,
) -> Self {
Self {
project_id,
project_store,
gb_repository,
}
}
fn get_current_file_content(
&self,
project_repository: &project_repository::Repository,
path: &std::path::Path,
) -> Result<Option<String>> {
if project_repository.is_path_ignored(path)? {
return Ok(None);
}
let reader = project_repository.get_wd_reader();
let path = path.to_str().unwrap();
if reader.size(path)? > 100_000 {
log::warn!("{}: ignoring large file: {}", self.project_id, path);
return Ok(None);
}
match reader.read(path)? {
reader::Content::UTF8(content) => Ok(Some(content)),
reader::Content::Binary(_) => {
log::warn!("{}: ignoring non-utf8 file: {}", self.project_id, path);
return Ok(None);
}
}
}
fn get_latest_file_contents(
&self,
project_repository: &project_repository::Repository,
path: &std::path::Path,
) -> Result<Option<String>> {
let path = path.to_str().unwrap();
let gb_head_reader = self
.gb_repository
.get_head_reader()
.with_context(|| "failed to get gb head reader")?;
let project_head_reader = project_repository
.get_head_reader()
.with_context(|| "failed to get project head reader")?;
let reader = if gb_head_reader.exists(path) {
gb_head_reader
} else if project_head_reader.exists(path) {
project_head_reader
} else {
return Ok(None);
};
if reader.size(path)? > 100_000 {
log::warn!("{}: ignoring large file: {}", self.project_id, path);
return Ok(None);
}
match reader.read(path)? {
reader::Content::UTF8(content) => Ok(Some(content)),
reader::Content::Binary(_) => {
log::warn!("{}: ignoring non-utf8 file: {}", self.project_id, path);
return Ok(None);
}
}
}
fn get_current_deltas(&self, path: &std::path::Path) -> Result<Option<Vec<deltas::Delta>>> {
let reader = self.gb_repository.get_wd_reader();
let deltas_path = self.gb_repository.deltas_path().join(path);
match reader.read_to_string(deltas_path.to_str().unwrap()) {
Ok(content) => Ok(Some(serde_json::from_str(&content)?)),
Err(reader::Error::NotFound) => Ok(None),
Err(err) => Err(err.into()),
}
}
pub fn register<P: AsRef<std::path::Path>>(&self, path: P) -> Result<()> {
let project = self
.project_store
.get_project(&self.project_id)
.with_context(|| "failed to get project")?;
if project.is_none() {
return Err(anyhow::anyhow!("project not found"));
}
let project = project.unwrap();
let project_repository = project_repository::Repository::open(&project)
.with_context(|| "failed to open project repository for project")?;
let path = path.as_ref();
let current_file_content = match self
.get_current_file_content(&project_repository, &path)
.with_context(|| "failed to get current file content")?
{
Some(content) => content,
None => return Ok(()),
};
let latest_file_content = self
.get_latest_file_contents(&project_repository, &path)
.with_context(|| "failed to get latest file content")?;
let current_deltas = self
.get_current_deltas(&path)
.with_context(|| "failed to get current deltas")?;
let mut text_doc = match (latest_file_content, current_deltas) {
(Some(latest_contents), Some(deltas)) => {
deltas::TextDocument::new(Some(&latest_contents), deltas)?
}
(Some(latest_contents), None) => {
deltas::TextDocument::new(Some(&latest_contents), vec![])?
}
(None, Some(deltas)) => deltas::TextDocument::new(None, deltas)?,
(None, None) => deltas::TextDocument::new(None, vec![])?,
};
if !text_doc.update(&current_file_content)? {
log::debug!(
"{}: {} no new deltas, ignoring",
self.project_id,
path.display()
);
return Ok(());
}
log::info!("{}: {} changed", self.project_id, path.display());
let writer = self.gb_repository.get_wd_writer();
// save current deltas
writer
.write_string(
self.gb_repository
.deltas_path()
.join(path)
.to_str()
.unwrap(),
&serde_json::to_string(&text_doc.get_deltas())?,
)
.with_context(|| "failed to write deltas")?;
// save file contents corresponding to the deltas
writer
.write_string(
self.gb_repository.wd_path().join(path).to_str().unwrap(),
&current_file_content,
)
.with_context(|| "failed to write file content")?;
Ok(())
}
}

View File

@ -1,11 +1,12 @@
use crate::{
app::{gb_repository, project_repository},
deltas, projects,
};
use anyhow::Result;
use tempfile::tempdir;
use super::file_change::Listener;
use crate::{
app::{gb_repository, project_repository},
deltas, projects, storage,
};
use super::project_file_change::Listener;
fn commit_all(repository: &git2::Repository) -> Result<git2::Oid> {
let mut index = repository.index()?;
@ -54,14 +55,21 @@ fn test_project(repository: &git2::Repository) -> Result<projects::Project> {
Ok(project)
}
fn project_store(project: &projects::Project) -> Result<projects::Storage> {
let storage = storage::Storage::from_path(tempdir()?.path().to_path_buf());
let store = projects::Storage::new(storage);
store.add_project(project)?;
Ok(store)
}
#[test]
fn test_register_existing_file() -> Result<()> {
let repository = test_repository()?;
let project = test_project(&repository)?;
let project_repo = project_repository::Repository::open(&project)?;
let gb_repo_path = tempdir()?.path().to_str().unwrap().to_string();
let gb_repo = gb_repository::Repository::open(gb_repo_path, &project)?;
let listener = Listener::new(&project, &project_repo, &gb_repo);
let gb_repo = gb_repository::Repository::open(gb_repo_path, project.id.clone())?;
let listener = Listener::new(project.id.clone(), project_store(&project)?, &gb_repo);
let file_path = std::path::Path::new("test.txt");
std::fs::write(project_repo.root().join(file_path), "test")?;
@ -92,8 +100,8 @@ fn test_register_new_file() -> Result<()> {
let project = test_project(&repository)?;
let project_repo = project_repository::Repository::open(&project)?;
let gb_repo_path = tempdir()?.path().to_str().unwrap().to_string();
let gb_repo = gb_repository::Repository::open(gb_repo_path, &project)?;
let listener = Listener::new(&project, &project_repo, &gb_repo);
let gb_repo = gb_repository::Repository::open(gb_repo_path, project.id.clone())?;
let listener = Listener::new(project.id.clone(), project_store(&project)?, &gb_repo);
let file_path = std::path::Path::new("test.txt");
std::fs::write(project_repo.root().join(file_path), "test")?;
@ -122,8 +130,8 @@ fn test_register_new_file_twice() -> Result<()> {
let project = test_project(&repository)?;
let project_repo = project_repository::Repository::open(&project)?;
let gb_repo_path = tempdir()?.path().to_str().unwrap().to_string();
let gb_repo = gb_repository::Repository::open(gb_repo_path, &project)?;
let listener = Listener::new(&project, &project_repo, &gb_repo);
let gb_repo = gb_repository::Repository::open(gb_repo_path, project.id.clone())?;
let listener = Listener::new(project.id.clone(), project_store(&project)?, &gb_repo);
let file_path = std::path::Path::new("test.txt");
std::fs::write(project_repo.root().join(file_path), "test")?;

View File

@ -1,7 +1,9 @@
use super::reader;
use crate::projects;
use anyhow::{Context, Result};
use crate::projects;
use super::reader;
pub struct Repository {
git_repository: git2::Repository,
}

View File

@ -1,6 +1,7 @@
use crate::fs;
use anyhow::{Context, Result};
use crate::fs;
#[derive(Debug, PartialEq)]
pub enum Content {
UTF8(String),

View File

@ -1,7 +1,8 @@
use super::reader::{self, Reader};
use anyhow::Result;
use tempfile::tempdir;
use super::reader::{self, Reader};
fn commit(repository: &git2::Repository) -> Result<git2::Oid> {
let mut index = repository.index()?;
index.add_all(&["."], git2::IndexAddOption::DEFAULT, None)?;

View File

@ -1,11 +1,14 @@
use std::collections::HashMap;
use anyhow::{anyhow, Context, Result};
use crate::{deltas, pty, sessions};
use super::{
gb_repository as repository,
reader::{self, Reader},
writer::{self, Writer},
};
use crate::{deltas, pty, sessions};
use anyhow::{anyhow, Context, Result};
use std::collections::HashMap;
pub struct SessionWriter<'writer> {
repository: &'writer repository::Repository,

View File

@ -1,22 +1,23 @@
use super::{gb_repository, project_repository};
use std::{sync, time};
use anyhow::{Context, Result};
use crossbeam_channel::{bounded, select, unbounded};
use crate::{
app::{dispatchers, listeners},
events, projects,
};
use anyhow::{Context, Result};
use core::time;
use crossbeam_channel::{bounded, select, unbounded};
use std::sync;
use super::{gb_repository, project_repository};
pub struct Watcher<'watcher> {
project: &'watcher projects::Project,
gb_repository: &'watcher gb_repository::Repository,
project_repository: &'watcher project_repository::Repository,
project_id: String,
tick_dispatcher: dispatchers::tick::Dispatcher,
file_change_dispatcher: dispatchers::file_change::Dispatcher,
file_change_listener: listeners::file_change::Listener<'watcher>,
check_current_session_listener: listeners::check_current_session::Listener<'watcher>,
stop: (
crossbeam_channel::Sender<()>,
@ -26,23 +27,39 @@ pub struct Watcher<'watcher> {
impl<'watcher> Watcher<'watcher> {
pub fn new(
project: &'watcher projects::Project,
project_id: String,
project_store: projects::Storage,
gb_repository: &'watcher gb_repository::Repository,
project_repository: &'watcher project_repository::Repository,
) -> Self {
Self {
gb_repository,
project_repository,
project,
tick_dispatcher: dispatchers::tick::Dispatcher::new(project),
file_change_dispatcher: dispatchers::file_change::Dispatcher::new(project),
events: sync::mpsc::Sender<events::Event>,
) -> Result<Self> {
let project = project_store
.get_project(&project_id)
.context("failed to get project")?;
if project.is_none() {
return Err(anyhow::anyhow!("project not found"));
}
let project = project.unwrap();
Ok(Self {
project_id: project_id.clone(),
tick_dispatcher: dispatchers::tick::Dispatcher::new(project_id.clone()),
file_change_dispatcher: dispatchers::file_change::Dispatcher::new(
project_id.clone(),
project.path,
),
file_change_listener: listeners::file_change::Listener::new(
project,
project_repository,
project_id.clone(),
project_store,
gb_repository,
events,
),
check_current_session_listener: listeners::check_current_session::Listener::new(
gb_repository,
),
stop: bounded(1),
}
})
}
pub fn stop(&self) -> anyhow::Result<()> {
@ -50,10 +67,10 @@ impl<'watcher> Watcher<'watcher> {
Ok(())
}
pub fn start(&self, events: sync::mpsc::Sender<events::Event>) -> Result<()> {
pub fn start(&self) -> Result<()> {
let (t_tx, t_rx) = unbounded();
let tick_dispatcher = self.tick_dispatcher.clone();
let project_id = self.project.id.clone();
let project_id = self.project_id.clone();
tauri::async_runtime::spawn_blocking(move || {
if let Err(e) = tick_dispatcher.start(time::Duration::from_secs(10), t_tx) {
@ -63,7 +80,7 @@ impl<'watcher> Watcher<'watcher> {
let (fw_tx, fw_rx) = unbounded();
let file_change_dispatcher = self.file_change_dispatcher.clone();
let project_id = self.project.id.clone();
let project_id = self.project_id.clone();
tauri::async_runtime::spawn_blocking(move || {
if let Err(e) = file_change_dispatcher.start(fw_tx) {
log::error!("{}: failed to start file watcher: {:#}", project_id, e);
@ -72,28 +89,32 @@ impl<'watcher> Watcher<'watcher> {
loop {
select! {
recv(t_rx) -> ts => {
let ts = ts.context("failed to receive tick event")?;
log::info!("{}: ticker ticked: {}", self.project.id, ts.elapsed().as_secs());
}
recv(fw_rx)-> path => {
let path = path.context("failed to receive file change event")?;
if !path.starts_with(".git") {
recv(t_rx) -> ts => match ts{
Ok(ts) => {
if let Err(e) = self.check_current_session_listener.register(ts) {
log::error!("{}: failed to handle tick event: {:#}", self.project_id, e);
}
}
Err(e) => {
log::error!("{}: failed to receive tick event: {:#}", self.project_id, e);
}
},
recv(fw_rx)-> path => match path {
Ok(path) => {
if let Err(e) = self.file_change_listener.register(&path) {
log::error!("{}: failed to handle file change: {:#}", self.project.id, e);
}
} else {
if let Err(e) = self.on_git_file_change(path.to_str().unwrap(), &events) {
log::error!("{}: failed to handle git file change: {:#}", self.project.id, e);
log::error!("{}: failed to handle file change: {:#}", self.project_id, e);
}
},
Err(e) => {
log::error!("{}: failed to receive file change event: {:#}", self.project_id, e);
}
},
recv(self.stop.1) -> _ => {
if let Err(e) = self.tick_dispatcher.stop() {
log::error!("{}: failed to stop ticker: {:#}", self.project.id, e);
log::error!("{}: failed to stop ticker: {:#}", self.project_id, e);
}
if let Err(e) = self.file_change_dispatcher.stop() {
log::error!("{}: failed to stop file watcher: {:#}", self.project.id, e);
log::error!("{}: failed to stop file watcher: {:#}", self.project_id, e);
}
break;
}
@ -102,36 +123,4 @@ impl<'watcher> Watcher<'watcher> {
Ok(())
}
fn on_git_file_change(
&self,
path: &str,
events: &sync::mpsc::Sender<events::Event>,
) -> Result<()> {
let event = if path.eq(".git/logs/HEAD") {
log::info!("{}: git activity", self.project.id);
Some(events::Event::git_activity(&self.project))
} else if path.eq(".git/HEAD") {
log::info!("{}: git head changed", self.project.id);
let head_ref = self.project_repository.head()?;
if let Some(head) = head_ref.name() {
Some(events::Event::git_head(&self.project, &head))
} else {
None
}
} else if path.eq(".git/index") {
log::info!("{}: git index changed", self.project.id);
Some(events::Event::git_index(&self.project))
} else {
None
};
if let Some(event) = event {
events
.send(event)
.with_context(|| "failed to send git event")?;
}
Ok(())
}
}

View File

@ -1,6 +1,7 @@
use anyhow::{Context, Result};
use std::io::Write;
use anyhow::{Context, Result};
pub trait Writer {
fn write_string(&self, path: &str, contents: &str) -> Result<()>;
fn append_string(&self, path: &str, contents: &str) -> Result<()>;

View File

@ -718,22 +718,24 @@ fn init(app_handle: tauri::AppHandle) -> Result<()> {
.watch(tx.clone(), &repo)
.with_context(|| format!("{}: failed to watch project", project.id))?;
// let local_data_dir = app_handle
// .path_resolver()
// .app_local_data_dir()
// .expect("failed to get local data dir");
let local_data_dir = app_handle
.path_resolver()
.app_local_data_dir()
.expect("failed to get local data dir");
// let p = project.clone();
// tauri::async_runtime::spawn_blocking(|| {
// let project = p;
// let gb_repo = app::gb_repository::Repository::open(local_data_dir, &project)
// .expect("failed to open gb repository");
// let p_repo = app::project_repository::Repository::open(&project)
// .expect("failed to open project repository");
// let w = app::watcher::Watcher::new(&project, &gb_repo, &p_repo);
// let (tx, rx) = std::sync::mpsc::channel::<events::Event>();
// w.start(tx).expect("failed to start watcher");
// });
let p = project.clone();
let ps = app_state.projects_storage.clone();
tauri::async_runtime::spawn_blocking(|| {
let project = p;
let project_storage = ps;
let gb_repo = app::gb_repository::Repository::open(local_data_dir, project.id.clone())
.expect("failed to open gb repository");
let (tx, _rx) = std::sync::mpsc::channel::<events::Event>();
let w = app::watcher::Watcher::new(project.id, project_storage, &gb_repo, tx)
.expect("failed to create watcher");
w.start().expect("failed to start watcher");
});
if let Err(err) = app_state
.deltas_searcher