start recording new sessions

This commit is contained in:
Nikita Galaiko 2023-03-03 13:21:06 +01:00
parent d962e2d7d1
commit 887b676acb
5 changed files with 146 additions and 142 deletions

View File

@ -13,7 +13,10 @@ use anyhow::{Context, Result};
use deltas::Delta; use deltas::Delta;
use log; use log;
use serde::{ser::SerializeMap, Serialize}; use serde::{ser::SerializeMap, Serialize};
use std::{collections::HashMap, sync::mpsc}; use std::{
collections::HashMap,
sync::{mpsc, Mutex},
};
use storage::Storage; use storage::Storage;
use tauri::{generate_context, Manager}; use tauri::{generate_context, Manager};
use tauri_plugin_log::{ use tauri_plugin_log::{
@ -56,6 +59,34 @@ impl From<anyhow::Error> for Error {
} }
} }
struct App {
pub projects_storage: projects::Storage,
pub users_storage: users::Storage,
pub deltas_searcher: search::Deltas,
pub watchers: Mutex<watchers::Watcher>,
}
impl App {
pub fn new(resolver: tauri::PathResolver) -> Self {
let local_data_dir = resolver.app_local_data_dir().unwrap();
let storage = Storage::from_path_resolver(&resolver);
let projects_storage = projects::Storage::new(storage.clone());
let users_storage = users::Storage::new(storage.clone());
let deltas_searcher = search::Deltas::at(local_data_dir);
let watchers = watchers::Watcher::new(
projects_storage.clone(),
users_storage.clone(),
deltas_searcher.clone(),
);
Self {
projects_storage,
users_storage,
deltas_searcher,
watchers: Mutex::new(watchers),
}
}
}
const IS_DEV: bool = cfg!(debug_assertions); const IS_DEV: bool = cfg!(debug_assertions);
fn app_title() -> String { fn app_title() -> String {
@ -110,13 +141,14 @@ fn list_sessions(
handle: tauri::AppHandle, handle: tauri::AppHandle,
project_id: &str, project_id: &str,
) -> Result<Vec<sessions::Session>, Error> { ) -> Result<Vec<sessions::Session>, Error> {
let path_resolver = handle.path_resolver(); let app_state = handle.state::<App>();
let storage = storage::Storage::from_path_resolver(&path_resolver);
let projects_storage = projects::Storage::new(storage.clone());
let users_storage = users::Storage::new(storage);
let repo = repositories::Repository::open(&projects_storage, &users_storage, project_id) let repo = repositories::Repository::open(
.with_context(|| format!("Failed to open repository for project {}", project_id))?; &app_state.projects_storage,
&app_state.users_storage,
project_id,
)
.with_context(|| format!("Failed to open repository for project {}", project_id))?;
let sessions = repo let sessions = repo
.sessions() .sessions()
@ -127,11 +159,10 @@ fn list_sessions(
#[tauri::command] #[tauri::command]
fn get_user(handle: tauri::AppHandle) -> Result<Option<users::User>, Error> { fn get_user(handle: tauri::AppHandle) -> Result<Option<users::User>, Error> {
let path_resolver = handle.path_resolver(); let app_state = handle.state::<App>();
let storage = storage::Storage::from_path_resolver(&path_resolver);
let users_storage = users::Storage::new(storage);
match users_storage match app_state
.users_storage
.get() .get()
.with_context(|| "Failed to get user".to_string())? .with_context(|| "Failed to get user".to_string())?
{ {
@ -157,11 +188,10 @@ fn get_user(handle: tauri::AppHandle) -> Result<Option<users::User>, Error> {
#[tauri::command] #[tauri::command]
fn set_user(handle: tauri::AppHandle, user: users::User) -> Result<(), Error> { fn set_user(handle: tauri::AppHandle, user: users::User) -> Result<(), Error> {
let path_resolver = handle.path_resolver(); let app_state = handle.state::<App>();
let storage = storage::Storage::from_path_resolver(&path_resolver);
let users_storage = users::Storage::new(storage);
users_storage app_state
.users_storage
.set(&user) .set(&user)
.with_context(|| "Failed to set user".to_string())?; .with_context(|| "Failed to set user".to_string())?;
@ -172,11 +202,10 @@ fn set_user(handle: tauri::AppHandle, user: users::User) -> Result<(), Error> {
#[tauri::command] #[tauri::command]
fn delete_user(handle: tauri::AppHandle) -> Result<(), Error> { fn delete_user(handle: tauri::AppHandle) -> Result<(), Error> {
let path_resolver = handle.path_resolver(); let app_state = handle.state::<App>();
let storage = storage::Storage::from_path_resolver(&path_resolver);
let users_storage = users::Storage::new(storage);
users_storage app_state
.users_storage
.delete() .delete()
.with_context(|| "Failed to delete user".to_string())?; .with_context(|| "Failed to delete user".to_string())?;
@ -190,11 +219,10 @@ fn update_project(
handle: tauri::AppHandle, handle: tauri::AppHandle,
project: projects::UpdateRequest, project: projects::UpdateRequest,
) -> Result<projects::Project, Error> { ) -> Result<projects::Project, Error> {
let path_resolver = handle.path_resolver(); let app_state = handle.state::<App>();
let storage = storage::Storage::from_path_resolver(&path_resolver);
let projects_storage = projects::Storage::new(storage);
let project = projects_storage let project = app_state
.projects_storage
.update_project(&project) .update_project(&project)
.with_context(|| format!("Failed to update project {}", project.id))?; .with_context(|| format!("Failed to update project {}", project.id))?;
@ -203,18 +231,10 @@ fn update_project(
#[tauri::command] #[tauri::command]
fn add_project(handle: tauri::AppHandle, path: &str) -> Result<projects::Project, Error> { fn add_project(handle: tauri::AppHandle, path: &str) -> Result<projects::Project, Error> {
let path_resolver = handle.path_resolver(); let app_state = handle.state::<App>();
let storage = storage::Storage::from_path_resolver(&path_resolver);
let projects_storage = projects::Storage::new(storage.clone());
let users_storage = users::Storage::new(storage);
let watchers_collection = handle.state::<watchers::WatcherCollection>();
let watchers = watchers::Watcher::new(
&watchers_collection,
projects_storage.clone(),
users_storage.clone(),
);
for project in projects_storage for project in app_state
.projects_storage
.list_projects() .list_projects()
.with_context(|| "Failed to list projects".to_string())? .with_context(|| "Failed to list projects".to_string())?
{ {
@ -222,21 +242,23 @@ fn add_project(handle: tauri::AppHandle, path: &str) -> Result<projects::Project
if !project.deleted { if !project.deleted {
return Err(Error::ProjectAlreadyExists); return Err(Error::ProjectAlreadyExists);
} else { } else {
projects_storage.update_project(&projects::UpdateRequest { app_state
id: project.id.clone(), .projects_storage
deleted: Some(false), .update_project(&projects::UpdateRequest {
..Default::default() id: project.id.clone(),
})?; deleted: Some(false),
..Default::default()
})?;
return Ok(project); return Ok(project);
} }
} }
} }
let project = projects::Project::from_path(path.to_string())?; let project = projects::Project::from_path(path.to_string())?;
projects_storage.add_project(&project)?; app_state.projects_storage.add_project(&project)?;
let (tx, rx): (mpsc::Sender<events::Event>, mpsc::Receiver<events::Event>) = mpsc::channel(); let (tx, rx): (mpsc::Sender<events::Event>, mpsc::Receiver<events::Event>) = mpsc::channel();
watchers.watch(tx, &project)?; app_state.watchers.lock().unwrap().watch(tx, &project)?;
watch_events(handle, rx); watch_events(handle, rx);
Ok(project) Ok(project)
@ -244,37 +266,28 @@ fn add_project(handle: tauri::AppHandle, path: &str) -> Result<projects::Project
#[tauri::command] #[tauri::command]
fn list_projects(handle: tauri::AppHandle) -> Result<Vec<projects::Project>, Error> { fn list_projects(handle: tauri::AppHandle) -> Result<Vec<projects::Project>, Error> {
let path_resolver = handle.path_resolver(); let app_state = handle.state::<App>();
let storage = storage::Storage::from_path_resolver(&path_resolver);
let projects_storage = projects::Storage::new(storage);
let projects = projects_storage.list_projects()?; let projects = app_state.projects_storage.list_projects()?;
Ok(projects) Ok(projects)
} }
#[tauri::command] #[tauri::command]
fn delete_project(handle: tauri::AppHandle, id: &str) -> Result<(), Error> { fn delete_project(handle: tauri::AppHandle, id: &str) -> Result<(), Error> {
let path_resolver = handle.path_resolver(); let app_state = handle.state::<App>();
let storage = storage::Storage::from_path_resolver(&path_resolver);
let projects_storage = projects::Storage::new(storage.clone());
let watchers_collection = handle.state::<watchers::WatcherCollection>();
let users_storage = users::Storage::new(storage);
let watchers = watchers::Watcher::new(
&watchers_collection,
projects_storage.clone(),
users_storage.clone(),
);
match projects_storage.get_project(id)? { match app_state.projects_storage.get_project(id)? {
Some(project) => { Some(project) => {
watchers.unwatch(project)?; app_state.watchers.lock().unwrap().unwatch(project)?;
projects_storage.update_project(&projects::UpdateRequest { app_state
id: id.to_string(), .projects_storage
deleted: Some(true), .update_project(&projects::UpdateRequest {
..Default::default() id: id.to_string(),
})?; deleted: Some(true),
..Default::default()
})?;
Ok(()) Ok(())
} }
@ -289,12 +302,13 @@ fn list_session_files(
session_id: &str, session_id: &str,
paths: Option<Vec<&str>>, paths: Option<Vec<&str>>,
) -> Result<HashMap<String, String>, Error> { ) -> Result<HashMap<String, String>, Error> {
let path_resolver = handle.path_resolver(); let app_state = handle.state::<App>();
let storage = storage::Storage::from_path_resolver(&path_resolver);
let projects_storage = projects::Storage::new(storage.clone());
let users_storage = users::Storage::new(storage);
let repo = repositories::Repository::open(&projects_storage, &users_storage, project_id)?; let repo = repositories::Repository::open(
&app_state.projects_storage,
&app_state.users_storage,
project_id,
)?;
let files = repo.files(session_id, paths)?; let files = repo.files(session_id, paths)?;
@ -307,12 +321,13 @@ fn list_deltas(
project_id: &str, project_id: &str,
session_id: &str, session_id: &str,
) -> Result<HashMap<String, Vec<Delta>>, Error> { ) -> Result<HashMap<String, Vec<Delta>>, Error> {
let path_resolver = handle.path_resolver(); let app_state = handle.state::<App>();
let storage = storage::Storage::from_path_resolver(&path_resolver);
let projects_storage = projects::Storage::new(storage.clone());
let users_storage = users::Storage::new(storage);
let repo = repositories::Repository::open(&projects_storage, &users_storage, project_id)?; let repo = repositories::Repository::open(
&app_state.projects_storage,
&app_state.users_storage,
project_id,
)?;
let deltas = repo.deltas(session_id)?; let deltas = repo.deltas(session_id)?;
@ -382,50 +397,36 @@ fn main() {
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
window.open_devtools(); window.open_devtools();
let mut app_state: App = App::new(app.path_resolver());
let resolver = app.path_resolver(); let resolver = app.path_resolver();
log::info!( let local_data_dir = resolver.app_local_data_dir().unwrap();
"Local data dir: {:?}", log::info!("Local data dir: {:?}", local_data_dir,);
resolver.app_local_data_dir().unwrap()
);
let storage = Storage::from_path_resolver(&resolver); if let Some(user) = app_state.users_storage.get().expect("Failed to get user") {
let projects_storage = projects::Storage::new(storage.clone()); sentry::configure_scope(|scope| scope.set_user(Some(user.clone().into())))
let users_storage = users::Storage::new(storage); }
let watcher_collection = watchers::WatcherCollection::default();
let watchers = watchers::Watcher::new(
&watcher_collection,
projects_storage.clone(),
users_storage.clone(),
);
users_storage
.get()
.and_then(|user| match user {
Some(user) => {
sentry::configure_scope(|scope| scope.set_user(Some(user.clone().into())));
Ok(())
}
None => Ok(()),
})
.expect("Failed to set user");
let (tx, rx): (mpsc::Sender<events::Event>, mpsc::Receiver<events::Event>) = let (tx, rx): (mpsc::Sender<events::Event>, mpsc::Receiver<events::Event>) =
mpsc::channel(); mpsc::channel();
match projects_storage.list_projects() { let projects = app_state
Ok(projects) => { .projects_storage
for project in projects { .list_projects()
watchers .expect("Failed to list projects");
.watch(tx.clone(), &project)
.with_context(|| format!("Failed to watch project: {}", project.id))? for project in projects {
} app_state
} .watchers
Err(e) => log::error!("Failed to list projects: {:#}", e), .lock()
.unwrap()
.watch(tx.clone(), &project)
.with_context(|| format!("Failed to watch project: {}", project.id))?
} }
watch_events(app.handle(), rx); watch_events(app.handle(), rx);
app.manage(watcher_collection); app.manage(app_state);
Ok(()) Ok(())
}) })

View File

@ -9,6 +9,7 @@ use std::{
}; };
use tantivy::{collector, directory::MmapDirectory, schema, IndexWriter}; use tantivy::{collector, directory::MmapDirectory, schema, IndexWriter};
#[derive(Clone)]
pub struct Deltas { pub struct Deltas {
base_path: String, base_path: String,
@ -58,6 +59,7 @@ impl Deltas {
project: &projects::Project, project: &projects::Project,
session: &sessions::Session, session: &sessions::Session,
) -> Result<()> { ) -> Result<()> {
log::info!("Indexing session {} in {}", session.id, project.path);
self.init(&project.id)?; self.init(&project.id)?;
index( index(
&self.indexes.get(&project.id).unwrap(), &self.indexes.get(&project.id).unwrap(),

View File

@ -9,20 +9,19 @@ use std::path::Path;
use std::sync::mpsc; use std::sync::mpsc;
use std::{collections::HashMap, sync::Mutex}; use std::{collections::HashMap, sync::Mutex};
#[derive(Default)] pub struct DeltaWatchers {
pub struct WatcherCollection(Mutex<HashMap<String, RecommendedWatcher>>); watchers: HashMap<String, RecommendedWatcher>,
pub struct DeltaWatchers<'a> {
watchers: &'a WatcherCollection,
} }
impl<'a> DeltaWatchers<'a> { impl DeltaWatchers {
pub fn new(watchers: &'a WatcherCollection) -> Self { pub fn new() -> Self {
Self { watchers } Self {
watchers: Default::default(),
}
} }
pub fn watch( pub fn watch(
&self, &mut self,
sender: mpsc::Sender<events::Event>, sender: mpsc::Sender<events::Event>,
project: projects::Project, project: projects::Project,
) -> Result<()> { ) -> Result<()> {
@ -34,11 +33,7 @@ impl<'a> DeltaWatchers<'a> {
watcher.watch(project_path, RecursiveMode::Recursive)?; watcher.watch(project_path, RecursiveMode::Recursive)?;
self.watchers self.watchers.insert(project.path.clone(), watcher);
.0
.lock()
.unwrap()
.insert(project.path.clone(), watcher);
let repo = git2::Repository::open(project_path)?; let repo = git2::Repository::open(project_path)?;
tauri::async_runtime::spawn_blocking(move || { tauri::async_runtime::spawn_blocking(move || {
@ -92,9 +87,8 @@ impl<'a> DeltaWatchers<'a> {
Ok(()) Ok(())
} }
pub fn unwatch(&self, project: projects::Project) -> Result<()> { pub fn unwatch(&mut self, project: projects::Project) -> Result<()> {
let mut watchers = self.watchers.0.lock().unwrap(); if let Some(mut watcher) = self.watchers.remove(&project.path) {
if let Some(mut watcher) = watchers.remove(&project.path) {
watcher.unwatch(Path::new(&project.path))?; watcher.unwatch(Path::new(&project.path))?;
} }
Ok(()) Ok(())

View File

@ -1,24 +1,23 @@
mod delta; mod delta;
mod session; mod session;
pub use self::delta::WatcherCollection; use crate::{events, projects, search, users};
use crate::{events, projects, users};
use anyhow::Result; use anyhow::Result;
use std::sync::mpsc; use std::sync::mpsc;
pub struct Watcher<'a> { pub struct Watcher {
session_watcher: session::SessionWatcher, session_watcher: session::SessionWatcher,
delta_watcher: delta::DeltaWatchers<'a>, delta_watcher: delta::DeltaWatchers,
} }
impl<'a> Watcher<'a> { impl Watcher {
pub fn new( pub fn new(
watchers: &'a delta::WatcherCollection,
projects_storage: projects::Storage, projects_storage: projects::Storage,
users_storage: users::Storage, users_storage: users::Storage,
deltas_searcher: search::Deltas,
) -> Self { ) -> Self {
let session_watcher = session::SessionWatcher::new(projects_storage, users_storage); let session_watcher = session::SessionWatcher::new(projects_storage, users_storage, deltas_searcher);
let delta_watcher = delta::DeltaWatchers::new(watchers); let delta_watcher = delta::DeltaWatchers::new();
Self { Self {
session_watcher, session_watcher,
delta_watcher, delta_watcher,
@ -26,7 +25,7 @@ impl<'a> Watcher<'a> {
} }
pub fn watch( pub fn watch(
&self, &mut self,
sender: mpsc::Sender<events::Event>, sender: mpsc::Sender<events::Event>,
project: &projects::Project, project: &projects::Project,
) -> Result<()> { ) -> Result<()> {
@ -36,7 +35,7 @@ impl<'a> Watcher<'a> {
Ok(()) Ok(())
} }
pub fn unwatch(&self, project: projects::Project) -> Result<()> { pub fn unwatch(&mut self, project: projects::Project) -> Result<()> {
self.delta_watcher.unwatch(project)?; self.delta_watcher.unwatch(project)?;
// TODO: how to unwatch session ? // TODO: how to unwatch session ?
Ok(()) Ok(())

View File

@ -1,4 +1,4 @@
use crate::{events, projects, sessions, users}; use crate::{events, projects, search, sessions, users};
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use git2::Repository; use git2::Repository;
use std::{ use std::{
@ -10,21 +10,27 @@ use std::{
const FIVE_MINUTES: u128 = Duration::new(5 * 60, 0).as_millis(); const FIVE_MINUTES: u128 = Duration::new(5 * 60, 0).as_millis();
const ONE_HOUR: u128 = Duration::new(60 * 60, 0).as_millis(); const ONE_HOUR: u128 = Duration::new(60 * 60, 0).as_millis();
#[derive(Debug, Clone)] #[derive(Clone)]
pub struct SessionWatcher { pub struct SessionWatcher {
projects_storage: projects::Storage, projects_storage: projects::Storage,
users_storage: users::Storage, users_storage: users::Storage,
deltas_searcher: search::Deltas,
} }
impl SessionWatcher { impl<'a> SessionWatcher {
pub fn new(projects_storage: projects::Storage, users_storage: users::Storage) -> Self { pub fn new(
projects_storage: projects::Storage,
users_storage: users::Storage,
deltas_searcher: search::Deltas,
) -> Self {
Self { Self {
projects_storage, projects_storage,
users_storage, users_storage,
deltas_searcher,
} }
} }
fn run(&self, project_id: &str, sender: mpsc::Sender<events::Event>) -> Result<()> { fn run(&mut self, project_id: &str, sender: mpsc::Sender<events::Event>) -> Result<()> {
match self match self
.projects_storage .projects_storage
.get_project(&project_id) .get_project(&project_id)
@ -42,7 +48,6 @@ impl SessionWatcher {
})?; })?;
match self.check_for_changes(&project, &user)? { match self.check_for_changes(&project, &user)? {
Some(session) => { Some(session) => {
// index
sender sender
.send(events::Event::session(&project, &session)) .send(events::Event::session(&project, &session))
.with_context(|| { .with_context(|| {
@ -71,11 +76,11 @@ impl SessionWatcher {
log::info!("Watching sessions for {}", project.path); log::info!("Watching sessions for {}", project.path);
let shared_self = self.clone(); let shared_self = self.clone();
let self_copy = shared_self.clone(); let mut self_copy = shared_self.clone();
let project_id = project.id; let project_id = project.id;
tauri::async_runtime::spawn_blocking(move || loop { tauri::async_runtime::spawn_blocking(move || loop {
let local_self = &self_copy; let local_self = &mut self_copy;
if let Err(e) = local_self.run(&project_id, sender.clone()) { if let Err(e) = local_self.run(&project_id, sender.clone()) {
log::error!("Error while running git watcher: {:#}", e); log::error!("Error while running git watcher: {:#}", e);
} }
@ -94,7 +99,7 @@ impl SessionWatcher {
// //
// returns a commited session if created // returns a commited session if created
fn check_for_changes( fn check_for_changes(
&self, &mut self,
project: &projects::Project, project: &projects::Project,
user: &Option<users::User>, user: &Option<users::User>,
) -> Result<Option<sessions::Session>> { ) -> Result<Option<sessions::Session>> {
@ -107,6 +112,9 @@ impl SessionWatcher {
session session
.flush(&repo, user, project) .flush(&repo, user, project)
.with_context(|| "Error while flushing session")?; .with_context(|| "Error while flushing session")?;
self.deltas_searcher
.index(&repo, &project, &session)
.with_context(|| format!("Error while indexing session {}", session.id))?;
Ok(Some(session)) Ok(Some(session))
} }
} }