add watcher crate with code copied from gitbutler-tauri/src/watcher.

It's not functional yet, but exactly at the spot where it needs fixing
across the boundary that is:

- a stand-in for application events
- analytics
This commit is contained in:
Sebastian Thiel 2024-04-17 19:11:24 +02:00
parent 17d9a1bcdd
commit 5223c197d2
No known key found for this signature in database
GPG Key ID: 9CB5EE7895E8268B
16 changed files with 3146 additions and 0 deletions

22
Cargo.lock generated
View File

@ -2144,6 +2144,28 @@ dependencies = [
"tempfile", "tempfile",
] ]
[[package]]
name = "gitbutler-watcher"
version = "0.0.0"
dependencies = [
"anyhow",
"async-trait",
"backoff",
"crossbeam-channel",
"futures",
"gitbutler-core",
"gitbutler-testsupport",
"itertools 0.12.1",
"notify",
"notify-debouncer-full",
"tauri",
"tempfile",
"thiserror",
"tokio",
"tokio-util",
"tracing",
]
[[package]] [[package]]
name = "glib" name = "glib"
version = "0.15.12" version = "0.15.12"

View File

@ -4,6 +4,7 @@ members = [
"crates/gitbutler-tauri", "crates/gitbutler-tauri",
"crates/gitbutler-changeset", "crates/gitbutler-changeset",
"crates/gitbutler-git", "crates/gitbutler-git",
"crates/gitbutler-watcher",
"crates/gitbutler-testsupport", "crates/gitbutler-testsupport",
] ]
resolver = "2" resolver = "2"

View File

@ -0,0 +1,43 @@
[package]
name = "gitbutler-watcher"
version = "0.0.0"
edition = "2021"
publish = false
[lib]
doctest = false
[dependencies]
gitbutler-core.workspace = true
thiserror.workspace = true
anyhow = "1.0.81"
futures = "0.3.30"
tokio = { workspace = true, features = [ "full", "sync" ] }
tokio-util = "0.7.10"
tracing = "0.1.40"
async-trait = "0.1.79"
backoff = "0.4.0"
notify = { version = "6.0.1" }
notify-debouncer-full = "0.3.1"
crossbeam-channel = "0.5.12"
itertools = "0.12"
# TODO(ST): remove this dependency, abstract it
[dependencies.tauri]
version = "1.6.1"
features = [
"http-all", "os-all", "dialog-open", "fs-read-file",
"path-all", "process-relaunch", "protocol-asset",
"shell-open", "window-maximize", "window-start-dragging",
"window-unmaximize"
]
[dev-dependencies]
tempfile = "3.10"
gitbutler-testsupport.workspace = true
[lints.clippy]
all = "deny"
perf = "deny"
correctness = "deny"

View File

@ -0,0 +1,98 @@
use std::fmt::Display;
use std::path::PathBuf;
use gitbutler_core::{projects::ProjectId, sessions};
/// An event for internal use, as merge between [super::file_monitor::Event] and [Event].
#[derive(Debug)]
pub(super) enum InternalEvent {
// From public API
Flush(ProjectId, sessions::Session),
CalculateVirtualBranches(ProjectId),
FetchGitbutlerData(ProjectId),
PushGitbutlerData(ProjectId),
// From file monitor
GitFilesChange(ProjectId, Vec<PathBuf>),
ProjectFilesChange(ProjectId, Vec<PathBuf>),
}
/// This type captures all operations that can be fed into a watcher that runs in the background.
// TODO(ST): This should not have to be implemented in the Watcher, figure out how this can be moved
// to application logic at least. However, it's called through a trait in `core`.
#[derive(Debug, PartialEq, Clone)]
pub enum Event {
Flush(ProjectId, sessions::Session),
CalculateVirtualBranches(ProjectId),
FetchGitbutlerData(ProjectId),
PushGitbutlerData(ProjectId),
}
impl Event {
pub fn project_id(&self) -> ProjectId {
match self {
Event::FetchGitbutlerData(project_id)
| Event::Flush(project_id, _)
| Event::CalculateVirtualBranches(project_id)
| Event::PushGitbutlerData(project_id) => *project_id,
}
}
}
impl From<Event> for InternalEvent {
fn from(value: Event) -> Self {
match value {
Event::Flush(a, b) => InternalEvent::Flush(a, b),
Event::CalculateVirtualBranches(v) => InternalEvent::CalculateVirtualBranches(v),
Event::FetchGitbutlerData(v) => InternalEvent::FetchGitbutlerData(v),
Event::PushGitbutlerData(v) => InternalEvent::PushGitbutlerData(v),
}
}
}
impl Display for InternalEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
InternalEvent::FetchGitbutlerData(pid) => {
write!(f, "FetchGitbutlerData({})", pid,)
}
InternalEvent::Flush(project_id, session) => {
write!(f, "Flush({}, {})", project_id, session.id)
}
InternalEvent::GitFilesChange(project_id, paths) => {
write!(
f,
"GitFileChange({}, {})",
project_id,
comma_separated_paths(paths)
)
}
InternalEvent::ProjectFilesChange(project_id, paths) => {
write!(
f,
"ProjectFileChange({}, {})",
project_id,
comma_separated_paths(paths)
)
}
InternalEvent::CalculateVirtualBranches(pid) => write!(f, "VirtualBranch({})", pid),
InternalEvent::PushGitbutlerData(pid) => write!(f, "PushGitbutlerData({})", pid),
}
}
}
fn comma_separated_paths(paths: &[PathBuf]) -> String {
const MAX_LISTING: usize = 5;
let listing = paths
.iter()
.take(MAX_LISTING)
.filter_map(|path| path.to_str())
.collect::<Vec<_>>()
.join(", ");
let remaining = paths.len().saturating_sub(MAX_LISTING);
if remaining > 0 {
format!("{listing} […{remaining} more]")
} else {
listing
}
}

View File

@ -0,0 +1,226 @@
use std::collections::HashSet;
use std::path::Path;
use std::time::Duration;
use crate::events::InternalEvent;
use anyhow::{anyhow, Context, Result};
use gitbutler_core::{git, projects::ProjectId};
use notify::Watcher;
use notify_debouncer_full::new_debouncer;
use tokio::task;
use tracing::Level;
/// The timeout for debouncing file change events.
/// This is used to prevent multiple events from being sent for a single file change.
const DEBOUNCE_TIMEOUT: Duration = Duration::from_millis(100);
/// This error is required only because `anyhow::Error` isn't implementing `std::error::Error`, and [`spawn()`]
/// needs to wrap it into a `backoff::Error` which also has to implement the `Error` trait.
#[derive(Debug, thiserror::Error)]
#[error(transparent)]
struct RunError {
#[from]
source: anyhow::Error,
}
/// Listen to interesting filesystem events of files in `path` that are not `.gitignore`d,
/// turn them into [`Events`](Event) which classifies it, and associates it with `project_id`.
/// These are sent through the passed `out` channel, to indicate either **Git** repository changes
/// or **ProjectWorktree** changes
///
/// ### Why is this not an iterator?
///
/// The internal `notify_rx` could be an iterator, which performs all transformations and returns them as item.
/// However, due to closures being continuously created each time events come in, nested closures need to own
/// their resources which means they are `Clone` or `Copy`. This isn't the case for `git::Repository`.
/// Even though `gix::Repository` is `Clone`, an efficient implementation of `is_path_ignored()` requires more state
/// that ideally is kept between invocations. For that reason, the current channel-based 'worker' architecture
/// is chosen to allow all this state to live on the stack.
///
/// Additionally, a channel plays better with how events are handled downstream.
pub fn spawn(
project_id: ProjectId,
worktree_path: &std::path::Path,
out: tokio::sync::mpsc::UnboundedSender<InternalEvent>,
) -> Result<()> {
let (notify_tx, notify_rx) = std::sync::mpsc::channel();
let mut debouncer =
new_debouncer(DEBOUNCE_TIMEOUT, None, notify_tx).context("failed to create debouncer")?;
let policy = backoff::ExponentialBackoffBuilder::new()
.with_max_elapsed_time(Some(std::time::Duration::from_secs(30)))
.build();
// Start the watcher, but retry if there are transient errors.
backoff::retry(policy, || {
debouncer
.watcher()
.watch(worktree_path, notify::RecursiveMode::Recursive)
.map_err(|err| match err.kind {
notify::ErrorKind::PathNotFound => backoff::Error::permanent(RunError::from(
anyhow!("{} not found", worktree_path.display()),
)),
notify::ErrorKind::Io(_) | notify::ErrorKind::InvalidConfig(_) => {
backoff::Error::permanent(RunError::from(anyhow::Error::from(err)))
}
_ => backoff::Error::transient(RunError::from(anyhow::Error::from(err))),
})
})
.context("failed to start watcher")?;
let worktree_path = worktree_path.to_owned();
task::spawn_blocking(move || {
tracing::debug!(%project_id, "file watcher started");
let _debouncer = debouncer;
let _runtime = tracing::span!(Level::INFO, "file monitor", %project_id ).entered();
'outer: for result in notify_rx {
let stats = tracing::span!(
Level::INFO,
"handle debounced events",
ignored = tracing::field::Empty,
project = tracing::field::Empty,
project_dedup = tracing::field::Empty,
git = tracing::field::Empty,
git_dedup = tracing::field::Empty,
git_noop = tracing::field::Empty,
fs_events = tracing::field::Empty,
)
.entered();
let (mut ignored, mut git_noop) = (0, 0);
match result {
Err(err) => {
tracing::error!(?err, "ignored file watcher error");
}
Ok(events) => {
let maybe_repo = git::Repository::open(&worktree_path).with_context(
|| {
format!(
"failed to open project repository: {}",
worktree_path.display()
)
},
).map(Some).unwrap_or_else(|err| {
tracing::error!(?err, "will consider changes to all files as repository couldn't be opened");
None
});
let num_events = events.len();
let classified_file_paths = events
.into_iter()
.filter(|event| is_interesting_kind(event.kind))
.flat_map(|event| event.event.paths)
.map(|file| {
let kind = maybe_repo
.as_ref()
.map_or(FileKind::Project, |repo| classify_file(repo, &file));
(file, kind)
});
let (mut stripped_git_paths, mut worktree_relative_paths) =
(HashSet::new(), HashSet::new());
for (file_path, kind) in classified_file_paths {
match kind {
FileKind::ProjectIgnored => ignored += 1,
FileKind::GitUninteresting => git_noop += 1,
FileKind::Project | FileKind::Git => {
match file_path.strip_prefix(&worktree_path) {
Ok(relative_file_path) => {
if relative_file_path.as_os_str().is_empty() {
continue;
}
if let Ok(stripped) =
relative_file_path.strip_prefix(".git")
{
stripped_git_paths.insert(stripped.to_owned());
} else {
worktree_relative_paths
.insert(relative_file_path.to_owned());
};
}
Err(err) => {
tracing::error!(%project_id, ?err, "failed to strip prefix");
}
}
}
}
}
stats.record("fs_events", num_events);
stats.record("ignored", ignored);
stats.record("git_noop", git_noop);
stats.record("git", stripped_git_paths.len());
stats.record("project", worktree_relative_paths.len());
if !stripped_git_paths.is_empty() {
let paths_dedup: Vec<_> = stripped_git_paths.into_iter().collect();
stats.record("git_dedup", paths_dedup.len());
let event = InternalEvent::GitFilesChange(project_id, paths_dedup);
if out.send(event).is_err() {
tracing::info!("channel closed - stopping file watcher");
break 'outer;
}
}
if !worktree_relative_paths.is_empty() {
let paths_dedup: Vec<_> = worktree_relative_paths.into_iter().collect();
stats.record("project_dedup", paths_dedup.len());
let event = InternalEvent::ProjectFilesChange(project_id, paths_dedup);
if out.send(event).is_err() {
tracing::info!("channel closed - stopping file watcher");
break 'outer;
}
}
}
}
}
});
Ok(())
}
#[cfg(target_family = "unix")]
fn is_interesting_kind(kind: notify::EventKind) -> bool {
matches!(
kind,
notify::EventKind::Create(notify::event::CreateKind::File)
| notify::EventKind::Modify(notify::event::ModifyKind::Data(_))
| notify::EventKind::Modify(notify::event::ModifyKind::Name(_))
| notify::EventKind::Remove(notify::event::RemoveKind::File)
)
}
#[cfg(target_os = "windows")]
fn is_interesting_kind(kind: notify::EventKind) -> bool {
matches!(
kind,
notify::EventKind::Create(_) | notify::EventKind::Modify(_) | notify::EventKind::Remove(_)
)
}
/// A classification for a changed file.
enum FileKind {
/// A file in the `.git` repository of the current project itself.
Git,
/// Like `Git`, but shouldn't have any effect.
GitUninteresting,
/// A file in the worktree of the current project.
Project,
/// A file that was ignored in the project, and thus shouldn't trigger a computation.
ProjectIgnored,
}
fn classify_file(git_repo: &git::Repository, file_path: &Path) -> FileKind {
if let Ok(check_file_path) = file_path.strip_prefix(git_repo.path()) {
if check_file_path == Path::new("FETCH_HEAD")
|| check_file_path == Path::new("logs/HEAD")
|| check_file_path == Path::new("HEAD")
|| check_file_path == Path::new("GB_FLUSH")
|| check_file_path == Path::new("index")
{
FileKind::Git
} else {
FileKind::GitUninteresting
}
} else if git_repo.is_path_ignored(file_path).unwrap_or(false) {
FileKind::ProjectIgnored
} else {
FileKind::Project
}
}

View File

@ -0,0 +1,425 @@
mod calculate_deltas;
mod index;
mod push_project_to_gitbutler;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::{path, time};
use anyhow::{bail, Context, Result};
use gitbutler_core::projects::ProjectId;
use gitbutler_core::sessions::SessionId;
use gitbutler_core::virtual_branches::VirtualBranches;
use gitbutler_core::{
assets, deltas, gb_repository, git, project_repository, projects, reader, sessions, users,
virtual_branches,
};
use tauri::{AppHandle, Manager};
use tracing::instrument;
use super::events;
use crate::{analytics, events as app_events};
// NOTE: This is `Clone` as each incoming event is spawned onto a thread for processing.
#[derive(Clone)]
pub struct Handler {
// The following fields our currently required state as we are running in the background
// and access it as filesystem events are processed. It's still to be decided how granular it
// should be, and I can imagine having a top-level `app` handle that keeps the application state of
// the tauri app, assuming that such application would not be `Send + Sync` everywhere and thus would
// need extra protection.
users: users::Controller,
analytics: analytics::Client,
local_data_dir: path::PathBuf,
projects: projects::Controller,
vbranch_controller: virtual_branches::Controller,
assets_proxy: assets::Proxy,
sessions_db: sessions::Database,
deltas_db: deltas::Database,
/// A function to send events - decoupled from app-handle for testing purposes.
#[allow(clippy::type_complexity)]
send_event: Arc<dyn Fn(&crate::events::Event) -> Result<()> + Send + Sync + 'static>,
}
impl Handler {
pub fn from_app(app: &AppHandle) -> Result<Self, anyhow::Error> {
let app_data_dir = app
.path_resolver()
.app_data_dir()
.context("failed to get app data dir")?;
let analytics = app
.try_state::<analytics::Client>()
.map_or(analytics::Client::default(), |client| {
client.inner().clone()
});
let users = app.state::<users::Controller>().inner().clone();
let projects = app.state::<projects::Controller>().inner().clone();
let vbranches = app.state::<virtual_branches::Controller>().inner().clone();
let assets_proxy = app.state::<assets::Proxy>().inner().clone();
let sessions_db = app.state::<sessions::Database>().inner().clone();
let deltas_db = app.state::<deltas::Database>().inner().clone();
Ok(Handler::new(
app_data_dir.clone(),
analytics,
users,
projects,
vbranches,
assets_proxy,
sessions_db,
deltas_db,
{
let app = app.clone();
move |event: &crate::events::Event| event.send(&app)
},
))
}
}
impl Handler {
/// A constructor whose primary use is the test-suite.
#[allow(clippy::too_many_arguments)]
pub fn new(
local_data_dir: PathBuf,
analytics: analytics::Client,
users: users::Controller,
projects: projects::Controller,
vbranch_controller: virtual_branches::Controller,
assets_proxy: assets::Proxy,
sessions_db: sessions::Database,
deltas_db: deltas::Database,
send_event: impl Fn(&app_events::Event) -> Result<()> + Send + Sync + 'static,
) -> Self {
Handler {
local_data_dir,
analytics,
users,
projects,
vbranch_controller,
assets_proxy,
sessions_db,
deltas_db,
send_event: Arc::new(send_event),
}
}
/// Handle the events that come in from the filesystem, or the public API.
#[instrument(skip(self, now), fields(event = %event), err(Debug))]
pub(super) async fn handle(
&self,
event: events::InternalEvent,
now: time::SystemTime,
) -> Result<()> {
match event {
events::InternalEvent::ProjectFilesChange(project_id, path) => {
self.recalculate_everything(path, project_id).await
}
events::InternalEvent::GitFilesChange(project_id, paths) => self
.git_files_change(paths, project_id)
.await
.context("failed to handle git file change event"),
events::InternalEvent::PushGitbutlerData(project_id) => self
.push_gb_data(project_id)
.context("failed to push gitbutler data"),
events::InternalEvent::FetchGitbutlerData(project_id) => self
.fetch_gb_data(project_id, now)
.await
.context("failed to fetch gitbutler data"),
events::InternalEvent::Flush(project_id, session) => self
.flush_session(project_id, &session)
.await
.context("failed to handle flush session event"),
events::InternalEvent::CalculateVirtualBranches(project_id) => self
.calculate_virtual_branches(project_id)
.await
.context("failed to handle virtual branch event"),
}
}
}
impl Handler {
fn emit_app_event(&self, event: &crate::events::Event) -> Result<()> {
(self.send_event)(event).context("failed to send event")
}
fn emit_session_file(
&self,
project_id: ProjectId,
session_id: SessionId,
file_path: &Path,
contents: Option<&reader::Content>,
) -> Result<()> {
self.emit_app_event(&app_events::Event::file(
project_id,
session_id,
&file_path.display().to_string(),
contents,
))
}
fn send_analytics_event_none_blocking(&self, event: &analytics::Event) -> Result<()> {
if let Some(user) = self.users.get_user().context("failed to get user")? {
self.analytics
.send_non_anonymous_event_nonblocking(&user, event);
}
Ok(())
}
async fn flush_session(
&self,
project_id: ProjectId,
session: &sessions::Session,
) -> Result<()> {
let project = self
.projects
.get(&project_id)
.context("failed to get project")?;
let user = self.users.get_user()?;
let project_repository =
project_repository::Repository::open(&project).context("failed to open repository")?;
let gb_repo = gb_repository::Repository::open(
&self.local_data_dir,
&project_repository,
user.as_ref(),
)
.context("failed to open repository")?;
let session = gb_repo
.flush_session(&project_repository, session, user.as_ref())
.context(format!("failed to flush session {}", session.id))?;
self.index_session(project_id, &session)?;
let push_gb_data = tokio::task::spawn_blocking({
let this = self.clone();
move || this.push_gb_data(project_id)
});
self.push_project_to_gitbutler(project_id, 1000).await?;
push_gb_data.await??;
Ok(())
}
#[instrument(skip(self, project_id))]
async fn calculate_virtual_branches(&self, project_id: ProjectId) -> Result<()> {
match self
.vbranch_controller
.list_virtual_branches(&project_id)
.await
{
Ok((branches, skipped_files)) => {
let branches = self.assets_proxy.proxy_virtual_branches(branches).await;
self.emit_app_event(&app_events::Event::virtual_branches(
project_id,
&VirtualBranches {
branches,
skipped_files,
},
))
}
Err(err) if err.is::<virtual_branches::errors::VerifyError>() => Ok(()),
Err(err) => Err(err.context("failed to list virtual branches").into()),
}
}
/// NOTE: this is an honest non-async function, and it should stay that way to avoid
/// dealing with git2 repositories across await points, which aren't `Send`.
fn push_gb_data(&self, project_id: ProjectId) -> Result<()> {
let user = self.users.get_user()?;
let project = self.projects.get(&project_id)?;
let project_repository =
project_repository::Repository::open(&project).context("failed to open repository")?;
let gb_repo = gb_repository::Repository::open(
&self.local_data_dir,
&project_repository,
user.as_ref(),
)
.context("failed to open repository")?;
gb_repo
.push(user.as_ref())
.context("failed to push gb repo")
}
pub async fn fetch_gb_data(&self, project_id: ProjectId, now: time::SystemTime) -> Result<()> {
let user = self.users.get_user()?;
let project = self
.projects
.get(&project_id)
.context("failed to get project")?;
if !project.api.as_ref().map(|api| api.sync).unwrap_or_default() {
bail!("sync disabled");
}
let project_repository =
project_repository::Repository::open(&project).context("failed to open repository")?;
let gb_repo = gb_repository::Repository::open(
&self.local_data_dir,
&project_repository,
user.as_ref(),
)
.context("failed to open repository")?;
let sessions_before_fetch = gb_repo
.get_sessions_iterator()?
.filter_map(Result::ok)
.collect::<Vec<_>>();
let policy = backoff::ExponentialBackoffBuilder::new()
.with_max_elapsed_time(Some(time::Duration::from_secs(10 * 60)))
.build();
let fetch_result = backoff::retry(policy, || {
gb_repo.fetch(user.as_ref()).map_err(|err| {
match err {
gb_repository::RemoteError::Network => backoff::Error::permanent(err),
err @ gb_repository::RemoteError::Other(_) => {
tracing::warn!(%project_id, ?err, will_retry = true, "failed to fetch project data");
backoff::Error::transient(err)
}
}
})
});
let fetch_result = match fetch_result {
Ok(()) => projects::FetchResult::Fetched { timestamp: now },
Err(backoff::Error::Permanent(gb_repository::RemoteError::Network)) => {
projects::FetchResult::Error {
timestamp: now,
error: "network error".to_string(),
}
}
Err(error) => {
tracing::error!(%project_id, ?error, will_retry=false, "failed to fetch gitbutler data");
projects::FetchResult::Error {
timestamp: now,
error: error.to_string(),
}
}
};
self.projects
.update(&projects::UpdateRequest {
id: project_id,
gitbutler_data_last_fetched: Some(fetch_result),
..Default::default()
})
.await
.context("failed to update fetched result")?;
let sessions_after_fetch = gb_repo.get_sessions_iterator()?.filter_map(Result::ok);
let new_sessions = sessions_after_fetch.filter(|s| !sessions_before_fetch.contains(s));
for session in new_sessions {
self.index_session(project_id, &session)?;
}
Ok(())
}
#[instrument(skip(self, paths, project_id), fields(paths = paths.len()))]
async fn recalculate_everything(
&self,
paths: Vec<PathBuf>,
project_id: ProjectId,
) -> Result<()> {
let calc_deltas = tokio::task::spawn_blocking({
let this = self.clone();
move || this.calculate_deltas(paths, project_id)
});
self.calculate_virtual_branches(project_id).await?;
calc_deltas.await??;
Ok(())
}
pub async fn git_file_change(
&self,
path: impl Into<PathBuf>,
project_id: ProjectId,
) -> Result<()> {
self.git_files_change(vec![path.into()], project_id).await
}
pub async fn git_files_change(&self, paths: Vec<PathBuf>, project_id: ProjectId) -> Result<()> {
let project = self
.projects
.get(&project_id)
.context("failed to get project")?;
let open_projects_repository = || {
project_repository::Repository::open(&project)
.context("failed to open project repository for project")
};
for path in paths {
let Some(file_name) = path.to_str() else {
continue;
};
match file_name {
"FETCH_HEAD" => {
self.emit_app_event(&app_events::Event::git_fetch(project_id))?;
self.calculate_virtual_branches(project_id).await?;
}
"logs/HEAD" => {
self.emit_app_event(&app_events::Event::git_activity(project.id))?;
}
"GB_FLUSH" => {
let user = self.users.get_user()?;
let project_repository = open_projects_repository()?;
let gb_repo = gb_repository::Repository::open(
&self.local_data_dir,
&project_repository,
user.as_ref(),
)
.context("failed to open repository")?;
let gb_flush_path = project.path.join(".git/GB_FLUSH");
if gb_flush_path.exists() {
if let Err(err) = std::fs::remove_file(&gb_flush_path) {
tracing::error!(%project_id, path = %gb_flush_path.display(), "GB_FLUSH file delete error: {err}");
}
if let Some(current_session) = gb_repo
.get_current_session()
.context("failed to get current session")?
{
self.flush_session(project.id, &current_session).await?;
}
}
}
"HEAD" => {
let project_repository = open_projects_repository()?;
let head_ref = project_repository
.get_head()
.context("failed to get head")?;
let head_ref_name = head_ref.name().context("failed to get head name")?;
if head_ref_name.to_string() != "refs/heads/gitbutler/integration" {
let mut integration_reference = project_repository
.git_repository
.find_reference(&git::Refname::from(git::LocalRefname::new(
"gitbutler/integration",
None,
)))?;
integration_reference.delete()?;
}
if let Some(head) = head_ref.name() {
self.send_analytics_event_none_blocking(&analytics::Event::HeadChange {
project_id,
reference_name: head_ref_name.to_string(),
})?;
self.emit_app_event(&app_events::Event::git_head(
project_id,
&head.to_string(),
))?;
}
}
"index" => {
self.emit_app_event(&app_events::Event::git_index(project.id))?;
}
_ => {}
}
}
Ok(())
}
}

View File

@ -0,0 +1,178 @@
use anyhow::{Context, Result};
use gitbutler_core::{
deltas, gb_repository, project_repository, projects::ProjectId, reader, sessions,
};
use std::num::NonZeroUsize;
use std::path::{Path, PathBuf};
use tracing::instrument;
impl super::Handler {
#[instrument(skip(self, paths, project_id))]
pub fn calculate_deltas(&self, paths: Vec<PathBuf>, project_id: ProjectId) -> Result<()> {
let make_processor = || -> Result<_> {
let project = self
.projects
.get(&project_id)
.context("failed to get project")?;
let project_repository = project_repository::Repository::open(&project)
.with_context(|| "failed to open project repository for project")?;
let user = self.users.get_user().context("failed to get user")?;
let gb_repository = gb_repository::Repository::open(
&self.local_data_dir,
&project_repository,
user.as_ref(),
)
.context("failed to open gb repository")?;
// If current session's branch is not the same as the project's head, flush it first.
if let Some(session) = gb_repository
.get_current_session()
.context("failed to get current session")?
{
let project_head = project_repository
.get_head()
.context("failed to get head")?;
if session.meta.branch != project_head.name().map(|n| n.to_string()) {
gb_repository
.flush_session(&project_repository, &session, user.as_ref())
.context(format!("failed to flush session {}", session.id))?;
}
}
let current_session = gb_repository
.get_or_create_current_session()
.context("failed to get or create current session")?;
let session = current_session.clone();
let process = move |path: &Path| -> Result<bool> {
let _span = tracing::span!(tracing::Level::TRACE, "processing", ?path).entered();
let current_session_reader =
sessions::Reader::open(&gb_repository, &current_session)
.context("failed to get session reader")?;
let deltas_reader = deltas::Reader::new(&current_session_reader);
let writer =
deltas::Writer::new(&gb_repository).context("failed to open deltas writer")?;
let current_wd_file_content = match Self::file_content(&project_repository, path) {
Ok(content) => Some(content),
Err(reader::Error::NotFound) => None,
Err(err) => Err(err).context("failed to get file content")?,
};
let latest_file_content = match current_session_reader.file(path) {
Ok(content) => Some(content),
Err(reader::Error::NotFound) => None,
Err(err) => Err(err).context("failed to get file content")?,
};
let current_deltas = deltas_reader
.read_file(path)
.context("failed to get file deltas")?;
let mut text_doc = deltas::Document::new(
latest_file_content.as_ref(),
current_deltas.unwrap_or_default(),
)?;
let new_delta = text_doc
.update(current_wd_file_content.as_ref())
.context("failed to calculate new deltas")?;
let Some(new_delta) = new_delta else {
return Ok(false);
};
let deltas = text_doc.get_deltas();
writer
.write(path, &deltas)
.context("failed to write deltas")?;
match &current_wd_file_content {
Some(reader::Content::UTF8(text)) => writer.write_wd_file(path, text),
Some(_) => writer.write_wd_file(path, ""),
None => writer.remove_wd_file(path),
}?;
let session_id = current_session.id;
self.emit_session_file(project_id, session_id, path, latest_file_content.as_ref())?;
self.index_deltas(
project_id,
session_id,
path,
std::slice::from_ref(&new_delta),
)
.context("failed to index deltas")?;
self.emit_app_event(&app_events::Event::deltas(
project_id,
session_id,
std::slice::from_ref(&new_delta),
path,
))?;
Ok(true)
};
Ok((process, session))
};
let num_paths = paths.len();
let num_no_delta = std::thread::scope(|scope| -> Result<usize> {
let num_threads = std::thread::available_parallelism()
.unwrap_or(NonZeroUsize::new(1).unwrap())
.get()
.min(paths.len());
let mut num_no_delta = 0;
let current_session = if num_threads < 2 {
let (process, session) = make_processor()?;
for path in paths {
if !process(path.as_path())? {
num_no_delta += 1;
}
}
session
} else {
let (threads, tx) = {
let (tx, rx) = crossbeam_channel::bounded::<PathBuf>(num_threads);
let threads: Vec<_> = (0..num_threads)
.map(|id| {
std::thread::Builder::new()
.name(format!("gitbutler_delta_thread_{id}"))
.stack_size(512 * 1024)
.spawn_scoped(scope, {
let rx = rx.clone();
|| -> Result<usize> {
let mut num_no_delta = 0;
let (process, _) = make_processor()?;
for path in rx {
if !process(path.as_path())? {
num_no_delta += 1;
}
}
Ok(num_no_delta)
}
})
.expect("worker thread can be created")
})
.collect();
(threads, tx)
};
for path in paths {
tx.send(path).expect("many receivers");
}
drop(tx);
for thread in threads {
num_no_delta += thread.join().unwrap()?;
}
let (_, session) = make_processor()?;
session
};
self.index_session(project_id, &current_session)?;
Ok(num_no_delta)
})?;
tracing::debug!(%project_id, paths_without_deltas = num_no_delta, paths_with_delta = num_paths - num_no_delta);
Ok(())
}
fn file_content(
project_repository: &project_repository::Repository,
path: &Path,
) -> Result<reader::Content, reader::Error> {
let full_path = project_repository.project().path.join(path);
if !full_path.exists() {
return Err(reader::Error::NotFound);
}
Ok(reader::Content::read_from_file(&full_path)?)
}
}

View File

@ -0,0 +1,92 @@
use std::path::Path;
use anyhow::{Context, Result};
use gitbutler_core::{
deltas, gb_repository, project_repository,
projects::ProjectId,
sessions::{self, SessionId},
};
use crate::events as app_events;
impl super::Handler {
pub(super) fn index_deltas(
&self,
project_id: ProjectId,
session_id: SessionId,
file_path: &Path,
deltas: &[deltas::Delta],
) -> Result<()> {
self.deltas_db
.insert(&project_id, &session_id, file_path, deltas)
.context("failed to insert deltas into database")
}
pub(crate) fn reindex(&self, project_id: ProjectId) -> Result<()> {
let user = self.users.get_user()?;
let project = self.projects.get(&project_id)?;
let project_repository =
project_repository::Repository::open(&project).context("failed to open repository")?;
let gb_repository = gb_repository::Repository::open(
&self.local_data_dir,
&project_repository,
user.as_ref(),
)
.context("failed to open repository")?;
let sessions_iter = gb_repository.get_sessions_iterator()?;
for session in sessions_iter {
self.process_session(&gb_repository, &session?)?;
}
Ok(())
}
pub(super) fn index_session(
&self,
project_id: ProjectId,
session: &sessions::Session,
) -> Result<()> {
let project = self.projects.get(&project_id)?;
let project_repository =
project_repository::Repository::open(&project).context("failed to open repository")?;
let user = self.users.get_user()?;
let gb_repository = gb_repository::Repository::open(
&self.local_data_dir,
&project_repository,
user.as_ref(),
)
.context("failed to open repository")?;
self.process_session(&gb_repository, session)
}
fn process_session(
&self,
gb_repository: &gb_repository::Repository,
session: &sessions::Session,
) -> Result<()> {
let project_id = gb_repository.get_project_id();
// now, index session if it has changed to the database.
let from_db = self.sessions_db.get_by_id(&session.id)?;
if from_db.map_or(false, |from_db| from_db == *session) {
return Ok(());
}
self.sessions_db
.insert(project_id, &[session])
.context("failed to insert session into database")?;
let session_reader = sessions::Reader::open(gb_repository, session)?;
let deltas_reader = deltas::Reader::new(&session_reader);
for (file_path, deltas) in deltas_reader
.read(None)
.context("could not list deltas for session")?
{
self.index_deltas(*project_id, session.id, &file_path, &deltas)?;
}
(self.send_event)(&app_events::Event::session(*project_id, session))?;
Ok(())
}
}

View File

@ -0,0 +1,219 @@
use std::time;
use anyhow::{Context, Result};
use gitbutler_core::id::Id;
use gitbutler_core::{
gb_repository,
git::{self, Oid, Repository},
project_repository,
projects::{self, CodePushState, ProjectId},
users,
};
use itertools::Itertools;
impl super::Handler {
pub async fn push_project_to_gitbutler(
&self,
project_id: ProjectId,
batch_size: usize,
) -> Result<()> {
let project = self
.projects
.get(&project_id)
.context("failed to get project")?;
if !project.is_sync_enabled() || !project.has_code_url() {
return Ok(());
}
let user = self.users.get_user()?;
let project_repository =
project_repository::Repository::open(&project).context("failed to open repository")?;
let gb_code_last_commit = project
.gitbutler_code_push_state
.as_ref()
.map(|state| &state.id)
.copied();
let gb_repository = gb_repository::Repository::open(
&self.local_data_dir,
&project_repository,
user.as_ref(),
)?;
let default_target = gb_repository
.default_target()
.context("failed to open gb repo")?
.context("failed to get default target")?;
let target_changed = gb_code_last_commit.map_or(true, |id| id != default_target.sha);
if target_changed {
match self
.push_target(
&project_repository,
&default_target,
gb_code_last_commit,
project_id,
user.as_ref(),
batch_size,
)
.await
{
Ok(()) => {}
Err(project_repository::RemoteError::Network) => return Ok(()),
Err(err) => return Err(err).context("failed to push"),
};
}
tokio::task::spawn_blocking(move || -> Result<()> {
match push_all_refs(&project_repository, user.as_ref(), project_id) {
Ok(()) => Ok(()),
Err(project_repository::RemoteError::Network) => Ok(()),
Err(err) => Err(err).context("failed to push"),
}
})
.await??;
// make sure last push time is updated
self.update_project(project_id, default_target.sha).await?;
Ok(())
}
}
/// Currently required to make functionality testable without requiring a `Handler` with all of its state.
impl super::Handler {
async fn update_project(
&self,
project_id: Id<projects::Project>,
id: Oid,
) -> Result<(), project_repository::RemoteError> {
self.projects
.update(&projects::UpdateRequest {
id: project_id,
gitbutler_code_push_state: Some(CodePushState {
id,
timestamp: time::SystemTime::now(),
}),
..Default::default()
})
.await
.context("failed to update last push")?;
Ok(())
}
async fn push_target(
&self,
project_repository: &project_repository::Repository,
default_target: &gitbutler_core::virtual_branches::target::Target,
gb_code_last_commit: Option<Oid>,
project_id: Id<projects::Project>,
user: Option<&users::User>,
batch_size: usize,
) -> Result<(), project_repository::RemoteError> {
let ids = batch_rev_walk(
&project_repository.git_repository,
batch_size,
default_target.sha,
gb_code_last_commit,
)?;
tracing::info!(
%project_id,
batches=%ids.len(),
"batches left to push",
);
let id_count = ids.len();
for (idx, id) in ids.iter().enumerate().rev() {
let refspec = format!("+{}:refs/push-tmp/{}", id, project_id);
project_repository.push_to_gitbutler_server(user, &[&refspec])?;
self.update_project(project_id, *id).await?;
tracing::info!(
%project_id,
i = id_count.saturating_sub(idx),
total = id_count,
"project batch pushed",
);
}
project_repository.push_to_gitbutler_server(
user,
&[&format!("+{}:refs/{}", default_target.sha, project_id)],
)?;
//TODO: remove push-tmp ref
tracing::info!(
%project_id,
"project target ref fully pushed",
);
Ok(())
}
}
fn push_all_refs(
project_repository: &project_repository::Repository,
user: Option<&users::User>,
project_id: Id<projects::Project>,
) -> Result<(), project_repository::RemoteError> {
let gb_references = collect_refs(project_repository)?;
let all_refs: Vec<_> = gb_references
.iter()
.filter(|r| {
matches!(
r,
git::Refname::Remote(_) | git::Refname::Virtual(_) | git::Refname::Local(_)
)
})
.map(|r| format!("+{}:{}", r, r))
.collect();
let all_refs: Vec<_> = all_refs.iter().map(String::as_str).collect();
let anything_pushed = project_repository.push_to_gitbutler_server(user, &all_refs)?;
if anything_pushed {
tracing::info!(
%project_id,
"refs pushed",
);
}
Ok(())
}
fn collect_refs(
project_repository: &project_repository::Repository,
) -> anyhow::Result<Vec<git::Refname>> {
Ok(project_repository
.git_repository
.references_glob("refs/*")?
.flatten()
.filter_map(|r| r.name())
.collect::<Vec<_>>())
}
fn batch_rev_walk(
repo: &Repository,
batch_size: usize,
from: Oid,
until: Option<Oid>,
) -> Result<Vec<Oid>> {
let mut revwalk = repo.revwalk().context("failed to create revwalk")?;
revwalk
.push(from.into())
.context(format!("failed to push {}", from))?;
if let Some(oid) = until {
revwalk
.hide(oid.into())
.context(format!("failed to hide {}", oid))?;
}
let mut oids = Vec::new();
oids.push(from);
let from = from.into();
for batch in &revwalk.chunks(batch_size) {
let Some(oid) = batch.last() else { continue };
let oid = oid.context("failed to get oid")?;
if oid != from {
oids.push(oid.into());
}
}
Ok(oids)
}

View File

@ -0,0 +1,174 @@
//! Implement the file-monitoring agent that informs about changes in interesting locations.
#![deny(missing_docs)]
#![allow(clippy::doc_markdown, clippy::missing_errors_doc)]
#![feature(slice_as_chunks)]
mod events;
pub use events::Event;
use events::InternalEvent;
mod file_monitor;
mod handler;
pub use handler::Handler;
use std::path::Path;
use std::{sync::Arc, time};
use anyhow::{Context, Result};
use futures::executor::block_on;
use gitbutler_core::projects::{self, Project, ProjectId};
use tauri::AppHandle;
use tokio::{
sync::mpsc::{unbounded_channel, UnboundedSender},
task,
};
use tokio_util::sync::CancellationToken;
use tracing::instrument;
/// Note that this type is managed in Tauri and thus needs to be send and sync.
#[derive(Clone)]
pub struct Watchers {
/// NOTE: This handle is required for this type to be self-contained as it's used by `core` through a trait.
app_handle: AppHandle,
/// The watcher of the currently active project.
/// NOTE: This is a `tokio` mutex as this needs to lock the inner option from within async.
watcher: Arc<tokio::sync::Mutex<Option<WatcherHandle>>>,
}
impl Watchers {
pub fn new(app_handle: AppHandle) -> Self {
Self {
app_handle,
watcher: Default::default(),
}
}
#[instrument(skip(self, project), err(Debug))]
pub fn watch(&self, project: &projects::Project) -> Result<()> {
let handler = handler::Handler::from_app(&self.app_handle)?;
let project_id = project.id;
let project_path = project.path.clone();
let handle = watch_in_background(handler, project_path, project_id)?;
block_on(self.watcher.lock()).replace(handle);
Ok(())
}
pub async fn post(&self, event: Event) -> Result<()> {
let watcher = self.watcher.lock().await;
if let Some(handle) = watcher
.as_ref()
.filter(|watcher| watcher.project_id == event.project_id())
{
handle.post(event).await.context("failed to post event")
} else {
Err(anyhow::anyhow!("watcher not found",))
}
}
pub async fn stop(&self, project_id: ProjectId) {
let mut handle = self.watcher.lock().await;
if handle
.as_ref()
.map_or(false, |handle| handle.project_id == project_id)
{
handle.take();
}
}
}
#[async_trait::async_trait]
impl gitbutler_core::projects::Watchers for Watchers {
fn watch(&self, project: &Project) -> Result<()> {
Watchers::watch(self, project)
}
async fn stop(&self, id: ProjectId) {
Watchers::stop(self, id).await
}
async fn fetch_gb_data(&self, id: ProjectId) -> Result<()> {
self.post(Event::FetchGitbutlerData(id)).await
}
async fn push_gb_data(&self, id: ProjectId) -> Result<()> {
self.post(Event::PushGitbutlerData(id)).await
}
}
/// An abstraction over a link to the spawned watcher, which runs in the background.
struct WatcherHandle {
/// A way to post events and interact with the actual handler in the background.
tx: UnboundedSender<InternalEvent>,
/// The id of the project we are watching.
project_id: ProjectId,
/// A way to tell the background process to stop handling events.
cancellation_token: CancellationToken,
}
impl Drop for WatcherHandle {
fn drop(&mut self) {
self.cancellation_token.cancel();
}
}
impl WatcherHandle {
pub async fn post(&self, event: Event) -> Result<()> {
self.tx.send(event.into()).context("failed to send event")?;
Ok(())
}
}
/// Run our file watcher processing loop in the background and let `handler` deal with them.
/// Return a handle to the watcher to allow interactions while it's running in the background.
/// Drop the handle to stop the watcher.
///
/// ### Important
///
/// It runs in such a way that each filesystem event is processed concurrently with others, which is why
/// spamming massive amounts of events should be avoided!
fn watch_in_background(
handler: handler::Handler,
path: impl AsRef<Path>,
project_id: ProjectId,
) -> Result<WatcherHandle, anyhow::Error> {
let (events_out, mut events_in) = unbounded_channel();
file_monitor::spawn(project_id, path.as_ref(), events_out.clone())?;
handler.reindex(project_id)?;
let cancellation_token = CancellationToken::new();
let handle = WatcherHandle {
tx: events_out,
project_id,
cancellation_token: cancellation_token.clone(),
};
let handle_event = move |event: InternalEvent| -> Result<()> {
let handler = handler.clone();
// NOTE: Traditional parallelization (blocking) is required as `tokio::spawn()` on
// the `handler.handle()` future isn't `Send` as it keeps non-Send things
// across await points. Further, there is a fair share of `sync` IO happening
// as well, so nothing can really be done here.
task::spawn_blocking(move || {
futures::executor::block_on(async move {
handler.handle(event, time::SystemTime::now()).await.ok();
});
});
Ok(())
};
tokio::spawn(async move {
loop {
tokio::select! {
Some(event) = events_in.recv() => handle_event(event)?,
() = cancellation_token.cancelled() => {
break;
}
}
}
Ok::<_, anyhow::Error>(())
});
Ok(handle)
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,57 @@
use std::time::SystemTime;
use gitbutler_core::projects;
use pretty_assertions::assert_eq;
use crate::watcher::handler::support::Fixture;
use crate::watcher::handler::test_remote_repository;
use gitbutler_testsupport::Case;
#[tokio::test]
async fn fetch_success() -> anyhow::Result<()> {
let mut fixture = Fixture::default();
{
let handler = fixture.new_handler();
let Case { project, .. } = &fixture.new_case();
let (cloud, _tmp) = test_remote_repository()?;
let api_project = projects::ApiProject {
name: "test-sync".to_string(),
description: None,
repository_id: "123".to_string(),
git_url: cloud.path().to_str().unwrap().to_string(),
code_git_url: None,
created_at: 0_i32.to_string(),
updated_at: 0_i32.to_string(),
sync: true,
};
fixture
.projects
.update(&projects::UpdateRequest {
id: project.id,
api: Some(api_project.clone()),
..Default::default()
})
.await?;
handler
.fetch_gb_data(project.id, SystemTime::now())
.await
.unwrap();
}
assert_eq!(fixture.events().len(), 0);
Ok(())
}
#[tokio::test]
async fn fetch_fail_no_sync() {
let mut fixture = Fixture::default();
{
let handler = fixture.new_handler();
let Case { project, .. } = &fixture.new_case();
let res = handler.fetch_gb_data(project.id, SystemTime::now()).await;
assert_eq!(&res.unwrap_err().to_string(), "sync disabled");
}
assert_eq!(fixture.events().len(), 0);
}

View File

@ -0,0 +1,94 @@
use std::fs;
use anyhow::Result;
use gitbutler_core::projects;
use gitbutler_tauri::watcher;
use pretty_assertions::assert_eq;
use crate::watcher::handler::support::Fixture;
use gitbutler_testsupport::Case;
#[tokio::test]
async fn flush_session() -> Result<()> {
let mut fixture = Fixture::default();
{
let case = fixture.new_case();
let Case {
project,
gb_repository,
..
} = &case;
assert!(gb_repository.get_current_session()?.is_none());
let handler = create_new_session_via_new_file(project, &mut fixture);
assert!(gb_repository.get_current_session()?.is_some());
let flush_file_path = project.path.join(".git/GB_FLUSH");
fs::write(flush_file_path.as_path(), "")?;
handler.git_file_change("GB_FLUSH", project.id).await?;
assert!(!flush_file_path.exists(), "flush file deleted");
}
let events = fixture.events();
assert_eq!(events.len(), 4);
assert!(events[0].name().ends_with("/files"));
assert!(events[1].name().ends_with("/deltas"));
assert!(events[2].name().ends_with("/sessions"));
assert!(events[3].name().ends_with("/sessions"));
Ok(())
}
#[tokio::test]
async fn do_not_flush_session_if_file_is_missing() -> Result<()> {
let mut fixture = Fixture::default();
{
let Case {
project,
gb_repository,
..
} = &fixture.new_case();
assert!(gb_repository.get_current_session()?.is_none());
let handler = create_new_session_via_new_file(project, &mut fixture);
assert!(gb_repository.get_current_session()?.is_some());
handler.git_file_change("GB_FLUSH", project.id).await?;
}
let events = fixture.events();
assert_eq!(events.len(), 3);
assert!(events[0].name().ends_with("/files"));
assert!(events[1].name().ends_with("/deltas"));
assert!(events[2].name().ends_with("/sessions"));
Ok(())
}
#[tokio::test]
async fn flush_deletes_flush_file_without_session_to_flush() -> Result<()> {
let mut fixture = Fixture::default();
{
let handler = fixture.new_handler();
let Case { project, .. } = &fixture.new_case();
let flush_file_path = project.path.join(".git/GB_FLUSH");
fs::write(flush_file_path.as_path(), "")?;
handler.git_file_change("GB_FLUSH", project.id).await?;
assert!(!flush_file_path.exists(), "flush file deleted");
}
assert_eq!(fixture.events().len(), 0);
Ok(())
}
fn create_new_session_via_new_file(
project: &projects::Project,
fixture: &mut Fixture,
) -> watcher::Handler {
fs::write(project.path.join("test.txt"), "test").unwrap();
let handler = fixture.new_handler();
handler
.calculate_deltas(vec!["test.txt".into()], project.id)
.unwrap();
handler
}

View File

@ -0,0 +1,104 @@
use tempfile::TempDir;
mod support {
use gitbutler_core::{assets, deltas, git, sessions, virtual_branches};
use gitbutler_tauri::{analytics, watcher};
use tempfile::TempDir;
/// Like [`gitbutler_testsupport::Suite`], but with all the instances needed to build a handler
pub struct Fixture {
inner: gitbutler_testsupport::Suite,
pub sessions_db: sessions::Database,
pub deltas_db: deltas::Database,
pub vbranch_controller: virtual_branches::Controller,
pub assets_proxy: assets::Proxy,
/// Keeps events emitted from the last created handler.
events: Option<std::sync::mpsc::Receiver<gitbutler_tauri::Event>>,
/// Storage for the databases, to be dropped last.
_tmp: TempDir,
}
impl std::ops::Deref for Fixture {
type Target = gitbutler_testsupport::Suite;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl Default for Fixture {
fn default() -> Self {
let (db, tmp) = gitbutler_testsupport::test_database();
let inner = gitbutler_testsupport::Suite::default();
let sessions_db = sessions::Database::new(db.clone());
let deltas_db = deltas::Database::new(db);
let git_credentials_helper =
git::credentials::Helper::new(inner.keys.clone(), inner.users.clone(), None);
let vbranch_controller = virtual_branches::Controller::new(
inner.local_app_data().to_owned(),
inner.projects.clone(),
inner.users.clone(),
inner.keys.clone(),
git_credentials_helper,
);
let assets_proxy = assets::Proxy::new(tmp.path().to_owned());
Fixture {
inner,
sessions_db,
deltas_db,
vbranch_controller,
assets_proxy,
events: None,
_tmp: tmp,
}
}
}
impl Fixture {
/// Must be mut as handler events are collected into the fixture automatically.
///
/// Note that this only works for the most recent created handler.
pub fn new_handler(&mut self) -> watcher::Handler {
let (tx, rx) = std::sync::mpsc::channel();
self.events = Some(rx);
watcher::Handler::new(
self.local_app_data().to_owned(),
analytics::Client::default(),
self.users.clone(),
self.projects.clone(),
self.vbranch_controller.clone(),
self.assets_proxy.clone(),
self.sessions_db.clone(),
self.deltas_db.clone(),
move |event| tx.send(event.clone()).map_err(Into::into),
)
}
/// Returns the events that were emitted to the tauri app.
pub fn events(&mut self) -> Vec<gitbutler_tauri::Event> {
let Some(rx) = self.events.as_ref() else {
return Vec::new();
};
let mut out = Vec::new();
// For safety, in case the `handler` is still alive, blocking consumption.
while let Ok(event) = rx.try_recv() {
out.push(event);
}
out
}
}
}
use gitbutler_testsupport::init_opts_bare;
fn test_remote_repository() -> anyhow::Result<(git2::Repository, TempDir)> {
let tmp = tempfile::tempdir()?;
let repo_a = git2::Repository::init_opts(&tmp, &init_opts_bare())?;
Ok((repo_a, tmp))
}
mod calculate_delta;
mod fetch_gitbutler_data;
mod git_file_change;
mod push_project_to_gitbutler;

View File

@ -0,0 +1,384 @@
use std::{collections::HashMap, path::PathBuf};
use anyhow::Result;
use gitbutler_core::{git, project_repository::LogUntil, projects};
use crate::watcher::handler::support::Fixture;
use crate::watcher::handler::test_remote_repository;
use gitbutler_testsupport::{virtual_branches::set_test_target, Case};
fn log_walk(repo: &git2::Repository, head: git::Oid) -> Vec<git::Oid> {
let mut walker = repo.revwalk().unwrap();
walker.push(head.into()).unwrap();
walker.map(|oid| oid.unwrap().into()).collect::<Vec<_>>()
}
#[tokio::test]
async fn push_error() -> Result<()> {
let mut fixture = Fixture::default();
let handler = fixture.new_handler();
let Case { project, .. } = &fixture.new_case();
let api_project = projects::ApiProject {
name: "test-sync".to_string(),
description: None,
repository_id: "123".to_string(),
git_url: String::new(),
code_git_url: Some(String::new()),
created_at: 0_i32.to_string(),
updated_at: 0_i32.to_string(),
sync: true,
};
fixture
.projects
.update(&projects::UpdateRequest {
id: project.id,
api: Some(api_project.clone()),
..Default::default()
})
.await?;
let res = handler.push_project_to_gitbutler(project.id, 100).await;
let err = res.unwrap_err();
assert_eq!(err.to_string(), "failed to get default target");
Ok(())
}
#[tokio::test]
async fn push_simple() -> Result<()> {
let mut fixture = Fixture::default();
let handler = fixture.new_handler();
let Case {
project,
gb_repository,
project_repository,
..
} = &fixture.new_case_with_files(HashMap::from([(PathBuf::from("test.txt"), "test")]));
fixture.sign_in();
set_test_target(gb_repository, project_repository).unwrap();
let target_id = gb_repository.default_target().unwrap().unwrap().sha;
let reference = project_repository.l(target_id, LogUntil::End).unwrap();
let (cloud_code, _tmp) = test_remote_repository()?;
let api_project = projects::ApiProject {
name: "test-sync".to_string(),
description: None,
repository_id: "123".to_string(),
git_url: String::new(),
code_git_url: Some(cloud_code.path().to_str().unwrap().to_string()),
created_at: 0_i32.to_string(),
updated_at: 0_i32.to_string(),
sync: true,
};
fixture
.projects
.update(&projects::UpdateRequest {
id: project.id,
api: Some(api_project.clone()),
..Default::default()
})
.await?;
cloud_code.find_commit(target_id.into()).unwrap_err();
{
handler
.push_project_to_gitbutler(project.id, 10)
.await
.unwrap();
}
cloud_code.find_commit(target_id.into()).unwrap();
let pushed = log_walk(&cloud_code, target_id);
assert_eq!(reference.len(), pushed.len());
assert_eq!(reference, pushed);
assert_eq!(
fixture
.projects
.get(&project.id)
.unwrap()
.gitbutler_code_push_state
.unwrap()
.id,
target_id
);
Ok(())
}
#[tokio::test]
async fn push_remote_ref() -> Result<()> {
let mut fixture = Fixture::default();
let handler = fixture.new_handler();
let Case {
project,
gb_repository,
project_repository,
..
} = &fixture.new_case();
fixture.sign_in();
set_test_target(gb_repository, project_repository).unwrap();
let (cloud_code, _tmp) = test_remote_repository()?;
let cloud_code: git::Repository = cloud_code.into();
let (remote_repo, _tmp) = test_remote_repository()?;
let remote_repo: git::Repository = remote_repo.into();
let last_commit = create_initial_commit(&remote_repo);
remote_repo
.reference(
&git::Refname::Local(git::LocalRefname::new("refs/heads/testbranch", None)),
last_commit,
false,
"",
)
.unwrap();
let mut remote = project_repository
.git_repository
.remote("tr", &remote_repo.path().to_str().unwrap().parse().unwrap())
.unwrap();
remote
.fetch(&["+refs/heads/*:refs/remotes/tr/*"], None)
.unwrap();
project_repository
.git_repository
.find_commit(last_commit)
.unwrap();
let api_project = projects::ApiProject {
name: "test-sync".to_string(),
description: None,
repository_id: "123".to_string(),
git_url: String::new(),
code_git_url: Some(cloud_code.path().to_str().unwrap().to_string()),
created_at: 0_i32.to_string(),
updated_at: 0_i32.to_string(),
sync: true,
};
fixture
.projects
.update(&projects::UpdateRequest {
id: project.id,
api: Some(api_project.clone()),
..Default::default()
})
.await?;
{
handler
.push_project_to_gitbutler(project.id, 10)
.await
.unwrap();
}
cloud_code.find_commit(last_commit).unwrap();
Ok(())
}
fn create_initial_commit(repo: &git::Repository) -> git::Oid {
let signature = git::Signature::now("test", "test@email.com").unwrap();
let mut index = repo.index().unwrap();
let oid = index.write_tree().unwrap();
repo.commit(
None,
&signature,
&signature,
"initial commit",
&repo.find_tree(oid).unwrap(),
&[],
)
.unwrap()
}
fn create_test_commits(repo: &git::Repository, commits: usize) -> git::Oid {
let signature = git::Signature::now("test", "test@email.com").unwrap();
let mut last = None;
for i in 0..commits {
let mut index = repo.index().unwrap();
let oid = index.write_tree().unwrap();
let head = repo.head().unwrap();
last = Some(
repo.commit(
Some(&head.name().unwrap()),
&signature,
&signature,
format!("commit {i}").as_str(),
&repo.find_tree(oid).unwrap(),
&[&repo
.find_commit(repo.refname_to_id("HEAD").unwrap())
.unwrap()],
)
.unwrap(),
);
}
last.unwrap()
}
#[tokio::test]
async fn push_batches() -> Result<()> {
let mut fixture = Fixture::default();
let handler = fixture.new_handler();
let Case {
project,
gb_repository,
project_repository,
..
} = &fixture.new_case();
fixture.sign_in();
{
let head: git::Oid = project_repository
.get_head()
.unwrap()
.peel_to_commit()
.unwrap()
.id();
let reference = project_repository.l(head, LogUntil::End).unwrap();
assert_eq!(reference.len(), 2);
let head = create_test_commits(&project_repository.git_repository, 10);
let reference = project_repository.l(head, LogUntil::End).unwrap();
assert_eq!(reference.len(), 12);
}
set_test_target(gb_repository, project_repository).unwrap();
let target_id = gb_repository.default_target().unwrap().unwrap().sha;
let reference = project_repository.l(target_id, LogUntil::End).unwrap();
let (cloud_code, _tmp) = test_remote_repository()?;
let api_project = projects::ApiProject {
name: "test-sync".to_string(),
description: None,
repository_id: "123".to_string(),
git_url: String::new(),
code_git_url: Some(cloud_code.path().to_str().unwrap().to_string()),
created_at: 0_i32.to_string(),
updated_at: 0_i32.to_string(),
sync: true,
};
fixture
.projects
.update(&projects::UpdateRequest {
id: project.id,
api: Some(api_project.clone()),
..Default::default()
})
.await?;
{
handler
.push_project_to_gitbutler(project.id, 2)
.await
.unwrap();
}
cloud_code.find_commit(target_id.into()).unwrap();
let pushed = log_walk(&cloud_code, target_id);
assert_eq!(reference.len(), pushed.len());
assert_eq!(reference, pushed);
assert_eq!(
fixture
.projects
.get(&project.id)
.unwrap()
.gitbutler_code_push_state
.unwrap()
.id,
target_id
);
Ok(())
}
#[tokio::test]
async fn push_again_no_change() -> Result<()> {
let mut fixture = Fixture::default();
let handler = fixture.new_handler();
let Case {
project,
gb_repository,
project_repository,
..
} = &fixture.new_case_with_files(HashMap::from([(PathBuf::from("test.txt"), "test")]));
fixture.sign_in();
set_test_target(gb_repository, project_repository).unwrap();
let target_id = gb_repository.default_target().unwrap().unwrap().sha;
let reference = project_repository.l(target_id, LogUntil::End).unwrap();
let (cloud_code, _tmp) = test_remote_repository()?;
let api_project = projects::ApiProject {
name: "test-sync".to_string(),
description: None,
repository_id: "123".to_string(),
git_url: String::new(),
code_git_url: Some(cloud_code.path().to_str().unwrap().to_string()),
created_at: 0_i32.to_string(),
updated_at: 0_i32.to_string(),
sync: true,
};
fixture
.projects
.update(&projects::UpdateRequest {
id: project.id,
api: Some(api_project.clone()),
..Default::default()
})
.await?;
cloud_code.find_commit(target_id.into()).unwrap_err();
{
handler
.push_project_to_gitbutler(project.id, 10)
.await
.unwrap();
}
cloud_code.find_commit(target_id.into()).unwrap();
let pushed = log_walk(&cloud_code, target_id);
assert_eq!(reference.len(), pushed.len());
assert_eq!(reference, pushed);
assert_eq!(
fixture
.projects
.get(&project.id)
.unwrap()
.gitbutler_code_push_state
.unwrap()
.id,
target_id
);
Ok(())
}

View File

@ -0,0 +1 @@
mod handler;