Merge pull request #2826 from gitbutlerapp/update-auto-fetching-mechanism-to-work-on-single-project-at-a-time

update auto fetching mechanism to work on single project at a time
This commit is contained in:
Kiril Videlov 2024-02-20 01:06:29 +01:00 committed by GitHub
commit 5024fae94a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 16 additions and 388 deletions

View File

@ -1,7 +1,6 @@
mod file_change; mod file_change;
mod tick;
use std::{path, time}; use std::path;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use tokio::{ use tokio::{
@ -17,7 +16,6 @@ use super::events;
#[derive(Clone)] #[derive(Clone)]
pub struct Dispatcher { pub struct Dispatcher {
tick_dispatcher: tick::Dispatcher,
file_change_dispatcher: file_change::Dispatcher, file_change_dispatcher: file_change::Dispatcher,
cancellation_token: CancellationToken, cancellation_token: CancellationToken,
} }
@ -33,14 +31,12 @@ pub enum RunError {
impl Dispatcher { impl Dispatcher {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
tick_dispatcher: tick::Dispatcher::new(),
file_change_dispatcher: file_change::Dispatcher::new(), file_change_dispatcher: file_change::Dispatcher::new(),
cancellation_token: CancellationToken::new(), cancellation_token: CancellationToken::new(),
} }
} }
pub fn stop(&self) { pub fn stop(&self) {
self.tick_dispatcher.stop();
self.file_change_dispatcher.stop(); self.file_change_dispatcher.stop();
} }
@ -57,11 +53,6 @@ impl Dispatcher {
Err(error) => Err(error).context("failed to run file change dispatcher")?, Err(error) => Err(error).context("failed to run file change dispatcher")?,
}?; }?;
let mut tick_rx = self
.tick_dispatcher
.run(project_id, time::Duration::from_secs(10))
.context("failed to run tick dispatcher")?;
let (tx, rx) = channel(1); let (tx, rx) = channel(1);
let project_id = *project_id; let project_id = *project_id;
task::Builder::new() task::Builder::new()
@ -72,11 +63,6 @@ impl Dispatcher {
() = self.cancellation_token.cancelled() => { () = self.cancellation_token.cancelled() => {
break; break;
} }
Some(event) = tick_rx.recv() => {
if let Err(error) = tx.send(event).await {
tracing::error!(%project_id, ?error,"failed to send tick");
}
}
Some(event) = file_change_rx.recv() => { Some(event) = file_change_rx.recv() => {
if let Err(error) = tx.send(event).await { if let Err(error) = tx.send(event).await {
tracing::error!(%project_id, ?error,"failed to send file change"); tracing::error!(%project_id, ?error,"failed to send file change");

View File

@ -1,94 +0,0 @@
use std::time;
use anyhow::Context;
use tokio::{
sync::mpsc::{channel, Receiver},
task,
};
use tokio_util::sync::CancellationToken;
use crate::{projects::ProjectId, watcher::events};
#[derive(Debug, Clone)]
pub struct Dispatcher {
cancellation_token: CancellationToken,
}
#[derive(Debug, thiserror::Error)]
pub enum RunError {
#[error(transparent)]
Other(#[from] anyhow::Error),
}
impl Dispatcher {
pub fn new() -> Self {
Self {
cancellation_token: CancellationToken::new(),
}
}
pub fn stop(&self) {
self.cancellation_token.cancel();
}
pub fn run(
self,
project_id: &ProjectId,
interval: time::Duration,
) -> Result<Receiver<events::Event>, RunError> {
let (tx, rx) = channel(1);
let mut ticker = tokio::time::interval(interval);
task::Builder::new()
.name(&format!("{} ticker", project_id))
.spawn({
let project_id = *project_id;
async move {
tracing::debug!(%project_id, "ticker started");
loop {
ticker.tick().await;
if self.cancellation_token.is_cancelled() {
break;
}
if let Err(error) = tx.send(events::Event::Tick(project_id)).await {
tracing::error!(%project_id, ?error, "failed to send tick");
}
}
tracing::debug!(%project_id, "ticker stopped");
}
})
.context("failed to spawn ticker task")?;
Ok(rx)
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
#[tokio::test]
async fn test_ticker() {
let dispatcher = Dispatcher::new();
let dispatcher2 = dispatcher.clone();
let mut rx = dispatcher2
.run(&ProjectId::generate(), Duration::from_millis(10))
.unwrap();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(50)).await;
dispatcher.stop();
});
let mut count = 0_i32;
while let Some(event) = rx.recv().await {
if let events::Event::Tick(_) = event {
count += 1_i32;
}
}
assert!(count >= 4_i32);
}
}

View File

@ -9,7 +9,6 @@ use crate::{
#[derive(Debug, PartialEq, Clone)] #[derive(Debug, PartialEq, Clone)]
pub enum Event { pub enum Event {
Tick(ProjectId),
Flush(ProjectId, sessions::Session), Flush(ProjectId, sessions::Session),
FetchGitbutlerData(ProjectId), FetchGitbutlerData(ProjectId),
@ -41,8 +40,7 @@ impl Event {
match self { match self {
Event::Analytics(event) => event.project_id(), Event::Analytics(event) => event.project_id(),
Event::Emit(event) => event.project_id(), Event::Emit(event) => event.project_id(),
Event::Tick(project_id) Event::IndexAll(project_id)
| Event::IndexAll(project_id)
| Event::FetchGitbutlerData(project_id) | Event::FetchGitbutlerData(project_id)
| Event::FetchProjectData(project_id) | Event::FetchProjectData(project_id)
| Event::Flush(project_id, _) | Event::Flush(project_id, _)
@ -65,7 +63,6 @@ impl Display for Event {
match self { match self {
Event::Analytics(event) => write!(f, "Analytics({})", event), Event::Analytics(event) => write!(f, "Analytics({})", event),
Event::Emit(event) => write!(f, "Emit({})", event.name()), Event::Emit(event) => write!(f, "Emit({})", event.name()),
Event::Tick(project_id) => write!(f, "Tick({})", project_id,),
Event::FetchGitbutlerData(pid) => { Event::FetchGitbutlerData(pid) => {
write!(f, "FetchGitbutlerData({})", pid,) write!(f, "FetchGitbutlerData({})", pid,)
} }

View File

@ -9,7 +9,6 @@ mod git_file_change;
mod index_handler; mod index_handler;
mod push_gitbutler_data; mod push_gitbutler_data;
mod push_project_to_gitbutler; mod push_project_to_gitbutler;
mod tick_handler;
use std::time; use std::time;
@ -24,7 +23,6 @@ use super::events;
#[derive(Clone)] #[derive(Clone)]
pub struct Handler { pub struct Handler {
git_file_change_handler: git_file_change::Handler, git_file_change_handler: git_file_change::Handler,
tick_handler: tick_handler::Handler,
flush_session_handler: flush_session::Handler, flush_session_handler: flush_session::Handler,
fetch_project_handler: fetch_project_data::Handler, fetch_project_handler: fetch_project_data::Handler,
fetch_gitbutler_handler: fetch_gitbutler_data::Handler, fetch_gitbutler_handler: fetch_gitbutler_data::Handler,
@ -48,7 +46,6 @@ impl TryFrom<&AppHandle> for Handler {
} else { } else {
let handler = Handler::new( let handler = Handler::new(
git_file_change::Handler::try_from(value)?, git_file_change::Handler::try_from(value)?,
tick_handler::Handler::try_from(value)?,
flush_session::Handler::try_from(value)?, flush_session::Handler::try_from(value)?,
fetch_project_data::Handler::try_from(value)?, fetch_project_data::Handler::try_from(value)?,
fetch_gitbutler_data::Handler::try_from(value)?, fetch_gitbutler_data::Handler::try_from(value)?,
@ -71,7 +68,6 @@ impl Handler {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn new( fn new(
git_file_change_handler: git_file_change::Handler, git_file_change_handler: git_file_change::Handler,
tick_handler: tick_handler::Handler,
flush_session_handler: flush_session::Handler, flush_session_handler: flush_session::Handler,
fetch_project_handler: fetch_project_data::Handler, fetch_project_handler: fetch_project_data::Handler,
fetch_gitbutler_handler: fetch_gitbutler_data::Handler, fetch_gitbutler_handler: fetch_gitbutler_data::Handler,
@ -86,7 +82,6 @@ impl Handler {
) -> Self { ) -> Self {
Self { Self {
git_file_change_handler, git_file_change_handler,
tick_handler,
flush_session_handler, flush_session_handler,
fetch_project_handler, fetch_project_handler,
fetch_gitbutler_handler, fetch_gitbutler_handler,
@ -148,11 +143,6 @@ impl Handler {
.await .await
.context("failed to fetch project data"), .context("failed to fetch project data"),
events::Event::Tick(project_id) => self
.tick_handler
.handle(project_id, &now)
.context("failed to handle tick"),
events::Event::Flush(project_id, session) => self events::Event::Flush(project_id, session) => self
.flush_session_handler .flush_session_handler
.handle(project_id, session) .handle(project_id, session)

View File

@ -1,265 +0,0 @@
use std::{path, time};
use anyhow::{Context, Result};
use tauri::{AppHandle, Manager};
use crate::{
gb_repository, project_repository,
projects::{self, FetchResult, ProjectId},
sessions, users,
};
use super::events;
#[derive(Clone)]
pub struct Handler {
local_data_dir: path::PathBuf,
projects: projects::Controller,
users: users::Controller,
}
impl TryFrom<&AppHandle> for Handler {
type Error = anyhow::Error;
fn try_from(value: &AppHandle) -> std::result::Result<Self, Self::Error> {
if let Some(handler) = value.try_state::<Handler>() {
Ok(handler.inner().clone())
} else if let Some(app_data_dir) = value.path_resolver().app_data_dir() {
let handler = Handler::new(
app_data_dir,
projects::Controller::try_from(value)?,
users::Controller::try_from(value)?,
);
value.manage(handler.clone());
Ok(handler)
} else {
Err(anyhow::anyhow!("failed to get app data dir"))
}
}
}
const GB_FETCH_INTERVAL: time::Duration = time::Duration::new(15 * 60, 0);
const PROJECT_FETCH_INTERVAL: time::Duration = time::Duration::new(15 * 60, 0);
const PROJECT_PUSH_INTERVAL: time::Duration = time::Duration::new(15 * 60, 0);
impl Handler {
fn new(
local_data_dir: path::PathBuf,
projects: projects::Controller,
users: users::Controller,
) -> Self {
Self {
local_data_dir,
projects,
users,
}
}
pub fn handle(
&self,
project_id: &ProjectId,
now: &time::SystemTime,
) -> Result<Vec<events::Event>> {
let user = self.users.get_user()?;
let project = self.projects.get(project_id)?;
let project_repository = match project_repository::Repository::open(&project) {
Ok(project_repository) => Ok(project_repository),
Err(project_repository::OpenError::NotFound(_)) => return Ok(vec![]),
Err(error) => Err(error),
}
.context("failed to open project repository")?;
let gb_repo = gb_repository::Repository::open(
&self.local_data_dir,
&project_repository,
user.as_ref(),
)
.context("failed to open repository")?;
let mut events = vec![];
let project_data_last_fetch = project
.project_data_last_fetch
.as_ref()
.map(FetchResult::timestamp)
.copied()
.unwrap_or(time::UNIX_EPOCH);
if now.duration_since(project_data_last_fetch)? > PROJECT_FETCH_INTERVAL {
events.push(events::Event::FetchProjectData(*project_id));
}
if project.is_sync_enabled() {
let gb_data_last_fetch = project
.gitbutler_data_last_fetch
.as_ref()
.map(FetchResult::timestamp)
.copied()
.unwrap_or(time::UNIX_EPOCH);
if now.duration_since(gb_data_last_fetch)? > GB_FETCH_INTERVAL {
events.push(events::Event::FetchGitbutlerData(*project_id));
}
}
if let Some(current_session) = gb_repo
.get_current_session()
.context("failed to get current session")?
{
if should_flush(now, &current_session)? {
events.push(events::Event::Flush(*project_id, current_session));
}
}
let should_push_code = project_repository.project().is_sync_enabled()
&& project_repository.project().has_code_url();
if should_push_code {
let project_code_last_push = project
.gitbutler_code_push_state
.as_ref()
.map(|state| &state.timestamp)
.copied()
.unwrap_or(time::UNIX_EPOCH);
if now.duration_since(project_code_last_push)? > PROJECT_PUSH_INTERVAL {
events.push(events::Event::PushProjectToGitbutler(*project_id));
}
}
Ok(events)
}
}
fn should_flush(now: &time::SystemTime, session: &sessions::Session) -> Result<bool> {
Ok(!is_session_active(now, session)? || is_session_too_old(now, session)?)
}
const ONE_HOUR: time::Duration = time::Duration::new(60 * 60, 0);
fn is_session_too_old(now: &time::SystemTime, session: &sessions::Session) -> Result<bool> {
let session_start =
time::UNIX_EPOCH + time::Duration::from_millis(session.meta.start_timestamp_ms.try_into()?);
Ok(session_start + ONE_HOUR < *now)
}
const FIVE_MINUTES: time::Duration = time::Duration::new(5 * 60, 0);
fn is_session_active(now: &time::SystemTime, session: &sessions::Session) -> Result<bool> {
let session_last_update =
time::UNIX_EPOCH + time::Duration::from_millis(session.meta.last_timestamp_ms.try_into()?);
Ok(session_last_update + FIVE_MINUTES > *now)
}
#[cfg(test)]
mod tests {
use crate::sessions::SessionId;
use super::*;
const ONE_MILLISECOND: time::Duration = time::Duration::from_millis(1);
#[test]
fn test_should_flush() {
let now = time::SystemTime::now();
for (start, last, expected) in vec![
(now, now, false), // just created
(now - FIVE_MINUTES, now, false), // active
(
now - FIVE_MINUTES - ONE_MILLISECOND,
now - FIVE_MINUTES,
true,
), // almost not active
(
now - FIVE_MINUTES - ONE_MILLISECOND,
now - FIVE_MINUTES - ONE_MILLISECOND,
true,
), // not active
(now - ONE_HOUR, now, true), // almost too old
(now - ONE_HOUR - ONE_MILLISECOND, now, true), // too old
] {
let session = sessions::Session {
id: SessionId::generate(),
hash: None,
meta: sessions::Meta {
start_timestamp_ms: start.duration_since(time::UNIX_EPOCH).unwrap().as_millis(),
last_timestamp_ms: last.duration_since(time::UNIX_EPOCH).unwrap().as_millis(),
branch: None,
commit: None,
},
};
assert_eq!(should_flush(&now, &session).unwrap(), expected);
}
}
}
#[cfg(test)]
mod test_handler {
use std::time::SystemTime;
use crate::test_utils::{Case, Suite};
use super::super::test_remote_repository;
use super::*;
#[tokio::test]
async fn test_fetch_triggered() -> Result<()> {
let suite = Suite::default();
let Case { project, .. } = suite.new_case();
let cloud = test_remote_repository()?;
let api_project = projects::ApiProject {
name: "test-sync".to_string(),
description: None,
repository_id: "123".to_string(),
git_url: cloud.path().to_str().unwrap().to_string(),
code_git_url: None,
created_at: 0_i32.to_string(),
updated_at: 0_i32.to_string(),
sync: true,
};
suite
.projects
.update(&projects::UpdateRequest {
id: project.id,
api: Some(api_project.clone()),
..Default::default()
})
.await?;
let listener = Handler {
local_data_dir: suite.local_app_data,
projects: suite.projects,
users: suite.users,
};
let result = listener.handle(&project.id, &SystemTime::now()).unwrap();
assert!(result
.iter()
.any(|ev| matches!(ev, events::Event::FetchGitbutlerData(_))));
Ok(())
}
#[test]
fn test_no_fetch_triggered() {
let suite = Suite::default();
let Case { project, .. } = suite.new_case();
let listener = Handler {
local_data_dir: suite.local_app_data,
projects: suite.projects,
users: suite.users,
};
let result = listener.handle(&project.id, &SystemTime::now()).unwrap();
assert!(!result
.iter()
.any(|ev| matches!(ev, events::Event::FetchGitbutlerData(_))));
}
}

View File

@ -9,8 +9,11 @@
import * as hotkeys from '$lib/utils/hotkeys'; import * as hotkeys from '$lib/utils/hotkeys';
import { unsubscribe } from '$lib/utils/random'; import { unsubscribe } from '$lib/utils/random';
import { getRemoteBranches } from '$lib/vbranches/branchStoresCache'; import { getRemoteBranches } from '$lib/vbranches/branchStoresCache';
import { interval, Subscription } from 'rxjs';
import { startWith, tap } from 'rxjs/operators';
import { onMount } from 'svelte'; import { onMount } from 'svelte';
import type { LayoutData } from './$types'; import type { LayoutData } from './$types';
import { page } from '$app/stores';
export let data: LayoutData; export let data: LayoutData;
@ -35,6 +38,17 @@
handleMenuActions(data.projectId); handleMenuActions(data.projectId);
onMount(() => { onMount(() => {
let fetchSub: Subscription;
// Project is auto-fetched on page load and then every 15 minutes
page.subscribe(() => {
fetchSub?.unsubscribe();
fetchSub = interval(1000 * 60 * 15)
.pipe(
startWith(0),
tap(() => baseBranchService.fetchFromTarget())
)
.subscribe();
});
return unsubscribe( return unsubscribe(
menuSubscribe(data.projectId), menuSubscribe(data.projectId),
hotkeys.on('Meta+Shift+S', () => syncToCloud($project$?.id)) hotkeys.on('Meta+Shift+S', () => syncToCloud($project$?.id))