exponential backoff for failed fetches

This commit is contained in:
Nikita Galaiko 2023-07-04 08:40:29 +02:00
parent 0a3f9b6adc
commit 6473751a60
10 changed files with 232 additions and 113 deletions

View File

@ -1,4 +1,4 @@
use std::{collections::HashMap, ops, sync};
use std::{collections::HashMap, ops, sync, time};
use anyhow::{Context, Result};
use tokio::sync::{mpsc, Semaphore};
@ -239,7 +239,10 @@ impl App {
pub fn update_project(&self, project: &projects::UpdateRequest) -> Result<projects::Project> {
let updated = self.projects_storage.update_project(project)?;
if let Err(err) = self.send_event(&project.id, watcher::Event::Fetch) {
if let Err(err) = self.send_event(
&project.id,
watcher::Event::FetchGitbutlerData(time::SystemTime::now()),
) {
log::error!("{}: failed to fetch project: {:#}", &project.id, err);
}

View File

@ -344,7 +344,14 @@ fn run_info(butler: ButlerCli) {
.unwrap_or("none".to_string())
.blue()
);
println!(" last_fetched_ts: {:?}", butler.project.last_fetched_ts);
println!(
" project_data_last_fetched: {:?}",
butler.project.project_data_last_fetched
);
println!(
" project_gitbutler_data_last_fetched: {:?}",
butler.project.gitbutler_data_last_fetched
);
println!(" path: {}", butler.project.path.blue());
if let Some(api) = butler.project.api.as_ref() {
@ -372,7 +379,7 @@ fn run_info(butler: ButlerCli) {
// sessions storage
let sessions = butler
.sessions_db
.list_by_project_id(&butler.project.id, butler.project.last_fetched_ts)
.list_by_project_id(&butler.project.id, None)
.unwrap();
//list the sessions
for session in &sessions {

View File

@ -1,5 +1,5 @@
mod project;
mod storage;
pub use project::{ApiProject, CreateError, Project};
pub use project::{ApiProject, CreateError, FetchResult, Project};
pub use storage::{Storage, UpdateRequest};

View File

@ -1,3 +1,5 @@
use std::time;
use anyhow::Result;
use serde::{Deserialize, Serialize};
use thiserror::Error;
@ -14,6 +16,47 @@ pub struct ApiProject {
pub sync: bool,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(rename_all = "camelCase")]
pub enum FetchResult {
Fetched {
timestamp_ms: u128,
},
Error {
timestamp_ms: u128,
error: String,
attempt: u32,
},
}
const TEN_MINUTES: time::Duration = time::Duration::new(10 * 60, 0);
impl FetchResult {
pub fn should_fetch(&self, now: &time::SystemTime) -> Result<bool> {
match self {
FetchResult::Error {
timestamp_ms,
attempt,
..
} => {
// if last fetch errored, wait 10 seconds * 2^attempt, up to 10 minutes
let last_fetch = time::UNIX_EPOCH
+ time::Duration::from_millis(TryInto::<u64>::try_into(*timestamp_ms)?);
Ok(
last_fetch + TEN_MINUTES.min(time::Duration::new(10 * 2u64.pow(*attempt), 0))
< *now,
)
}
FetchResult::Fetched { timestamp_ms } => {
// if last fetch was successful, wait 10 minutes
let last_fetch = time::UNIX_EPOCH
+ time::Duration::from_millis(TryInto::<u64>::try_into(*timestamp_ms)?);
Ok(last_fetch + TEN_MINUTES < *now)
}
}
}
}
#[derive(Debug, Deserialize, Serialize, Clone, Default)]
pub struct Project {
pub id: String,
@ -22,7 +65,9 @@ pub struct Project {
pub path: String,
pub api: Option<ApiProject>,
#[serde(default)]
pub last_fetched_ts: Option<u128>,
pub project_data_last_fetched: Option<FetchResult>,
#[serde(default)]
pub gitbutler_data_last_fetched: Option<FetchResult>,
}
impl AsRef<Project> for Project {

View File

@ -16,7 +16,8 @@ pub struct UpdateRequest {
pub title: Option<String>,
pub description: Option<String>,
pub api: Option<project::ApiProject>,
pub last_fetched_ts: Option<u128>,
pub project_data_last_fetched: Option<project::FetchResult>,
pub gitbutler_data_last_fetched: Option<project::FetchResult>,
}
impl Storage {
@ -72,7 +73,15 @@ impl Storage {
project.api = Some(api.clone());
}
project.last_fetched_ts = update_request.last_fetched_ts;
if let Some(project_data_last_fetched) = update_request.project_data_last_fetched.as_ref() {
project.project_data_last_fetched = Some(project_data_last_fetched.clone());
}
if let Some(gitbutler_data_last_fetched) =
update_request.gitbutler_data_last_fetched.as_ref()
{
project.gitbutler_data_last_fetched = Some(gitbutler_data_last_fetched.clone());
}
self.storage
.write(PROJECTS_FILE, &serde_json::to_string(&projects)?)?;

View File

@ -6,7 +6,8 @@ use crate::{bookmarks, deltas, sessions};
pub enum Event {
Tick(time::SystemTime),
Flush(sessions::Session),
Fetch,
FetchGitbutlerData(time::SystemTime),
FileChange(path::PathBuf),
GitFileChange(path::PathBuf),
@ -30,8 +31,8 @@ impl Display for Event {
match self {
Event::IndexAll => write!(f, "IndexAll"),
Event::Tick(ts) => write!(f, "Tick({:?})", ts),
Event::FetchGitbutlerData(ts) => write!(f, "FetchGitbutlerData({:?})", ts),
Event::Flush(session) => write!(f, "Flush({})", session.id),
Event::Fetch => write!(f, "Fetch"),
Event::GitFetch => write!(f, "GitFetch"),
Event::FileChange(_) => write!(f, "FileChange"),
Event::GitFileChange(_) => write!(f, "GitFileChange"),

View File

@ -1,50 +0,0 @@
use std::time;
use anyhow::{Context, Result};
use crate::projects;
use super::events;
#[derive(Clone)]
pub struct Handler {
project_id: String,
project_storage: projects::Storage,
}
impl Handler {
pub fn new(project_id: String, project_storage: projects::Storage) -> Self {
Self {
project_id,
project_storage,
}
}
pub fn handle(&self, now: time::SystemTime) -> Result<Vec<events::Event>> {
match self
.project_storage
.get_project(&self.project_id)
.context("failed to get project")?
{
None => Ok(vec![]),
Some(project) => {
if should_fetch(now, &project)? {
Ok(vec![events::Event::Fetch])
} else {
Ok(vec![])
}
}
}
}
}
const TEN_MINUTES: time::Duration = time::Duration::new(10 * 60, 0);
pub(super) fn should_fetch(now: time::SystemTime, project: &projects::Project) -> Result<bool> {
if project.last_fetched_ts.is_none() {
return Ok(true);
}
let project_last_fetch = time::UNIX_EPOCH
+ time::Duration::from_millis(project.last_fetched_ts.unwrap().try_into()?);
Ok(project_last_fetch + TEN_MINUTES < now)
}

View File

@ -2,7 +2,7 @@ use std::{path, time};
use anyhow::{Context, Result};
use crate::{gb_repository, project_repository, projects, users};
use crate::{gb_repository, projects, users};
use super::events;
@ -29,7 +29,21 @@ impl Handler {
}
}
pub fn handle(&self) -> Result<Vec<events::Event>> {
pub fn handle(&self, now: time::SystemTime) -> Result<Vec<events::Event>> {
let project = self
.project_storage
.get_project(&self.project_id)
.context("failed to get project")?
.ok_or_else(|| anyhow::anyhow!("project not found"))?;
if !project
.gitbutler_data_last_fetched
.as_ref()
.map_or(Ok(true), |r| r.should_fetch(&now))?
{
return Ok(vec![]);
}
let gb_repo = gb_repository::Repository::open(
self.local_data_dir.clone(),
self.project_id.clone(),
@ -43,45 +57,32 @@ impl Handler {
.filter_map(|s| s.ok())
.collect::<Vec<_>>();
// update last_fetched no matter what happens
let fetch_result = if let Err(err) = gb_repo.fetch() {
projects::FetchResult::Error {
attempt: project
.gitbutler_data_last_fetched
.as_ref()
.map_or(0, |r| match r {
projects::FetchResult::Error { attempt, .. } => *attempt + 1,
projects::FetchResult::Fetched { .. } => 0,
}),
timestamp_ms: now.duration_since(time::UNIX_EPOCH)?.as_millis(),
error: err.to_string(),
}
} else {
projects::FetchResult::Fetched {
timestamp_ms: now.duration_since(time::UNIX_EPOCH)?.as_millis(),
}
};
self.project_storage
.update_project(&projects::UpdateRequest {
id: self.project_id.clone(),
last_fetched_ts: Some(
time::SystemTime::now()
.duration_since(time::UNIX_EPOCH)
.context("failed to get time since epoch")?
.as_millis(),
),
gitbutler_data_last_fetched: Some(fetch_result),
..Default::default()
})
.context("failed to update project")?;
let mut fetched = false;
if let Err(err) = gb_repo.fetch() {
log::error!("failed to fetch from handler: {}", err);
} else {
fetched = true
};
let project = self
.project_storage
.get_project(&self.project_id)
.context("failed to get project")?
.ok_or_else(|| anyhow::anyhow!("project not found"))?;
let project_repository = project_repository::Repository::open(&project)
.context("failed to open project repository")?;
if let Err(err) = project_repository.fetch() {
log::error!("failed to fetch: {}", err);
} else {
fetched = true
};
if !fetched {
return Ok(vec![]);
}
let sessions_after_fetch = gb_repo
.get_sessions_iterator()?
.filter_map(|s| s.ok())

View File

@ -0,0 +1,68 @@
use std::time;
use anyhow::{Context, Result};
use crate::{project_repository, projects};
use super::events;
#[derive(Clone)]
pub struct Handler {
project_id: String,
project_storage: projects::Storage,
}
impl Handler {
pub fn new(project_id: String, project_storage: projects::Storage) -> Self {
Self {
project_id,
project_storage,
}
}
pub fn handle(&self, now: time::SystemTime) -> Result<Vec<events::Event>> {
let project = self
.project_storage
.get_project(&self.project_id)
.context("failed to get project")?
.ok_or_else(|| anyhow::anyhow!("project not found"))?;
if !project
.project_data_last_fetched
.as_ref()
.map_or(Ok(true), |r| r.should_fetch(&now))?
{
return Ok(vec![]);
}
let project_repository = project_repository::Repository::open(&project)?;
let fetch_result = if let Err(err) = project_repository.fetch() {
projects::FetchResult::Error {
attempt: project
.project_data_last_fetched
.as_ref()
.map_or(0, |r| match r {
projects::FetchResult::Error { attempt, .. } => *attempt + 1,
projects::FetchResult::Fetched { .. } => 0,
}),
timestamp_ms: now.duration_since(time::UNIX_EPOCH)?.as_millis(),
error: err.to_string(),
}
} else {
projects::FetchResult::Fetched {
timestamp_ms: now.duration_since(time::UNIX_EPOCH)?.as_millis(),
}
};
self.project_storage
.update_project(&projects::UpdateRequest {
id: self.project_id.clone(),
project_data_last_fetched: Some(fetch_result),
..Default::default()
})
.context("failed to update project")?;
Ok(vec![])
}
}

View File

@ -1,6 +1,6 @@
mod check_current_session;
mod check_fetch_project;
mod fetch_project;
mod fetch_gitbutler_data;
mod fetch_project_data;
mod file_change;
mod flush_session;
mod git_file_change;
@ -29,8 +29,8 @@ pub struct Handler {
git_file_change_handler: git_file_change::Handler,
check_current_session_handler: check_current_session::Handler,
flush_session_handler: flush_session::Handler,
fetch_project_handler: fetch_project::Handler,
chech_fetch_project_handler: check_fetch_project::Handler,
fetch_project_handler: fetch_project_data::Handler,
fetch_gitbutler_handler: fetch_gitbutler_data::Handler,
index_handler: index_handler::Handler,
events_sender: app_events::Sender,
@ -77,16 +77,16 @@ impl<'handler> Handler {
project_store.clone(),
user_store.clone(),
),
fetch_project_handler: fetch_project::Handler::new(
fetch_project_handler: fetch_project_data::Handler::new(
project_id.clone(),
project_store.clone(),
),
fetch_gitbutler_handler: fetch_gitbutler_data::Handler::new(
local_data_dir.clone(),
project_id.clone(),
project_store.clone(),
user_store.clone(),
),
chech_fetch_project_handler: check_fetch_project::Handler::new(
project_id.clone(),
project_store.clone(),
),
index_handler: index_handler::Handler::new(
local_data_dir,
project_id,
@ -144,22 +144,57 @@ impl<'handler> Handler {
.context("failed to send git index event")?;
Ok(vec![])
}
events::Event::FetchGitbutlerData(tick) => self
.fetch_gitbutler_handler
.handle(tick)
.context("failed to fetch gitbutler data"),
events::Event::Tick(tick) => {
let one = self
.check_current_session_handler
.handle(tick)
.context("failed to handle tick event")?;
let two = self
.chech_fetch_project_handler
.handle(tick)
.context("failed to handle tick event")?;
Ok(one.into_iter().chain(two.into_iter()).collect())
let one = match self.check_current_session_handler.handle(tick) {
Ok(events) => events,
Err(err) => {
log::error!(
"{}: failed to check current session: {:#?}",
self.project_id,
err
);
vec![]
}
};
let two = match self.fetch_project_handler.handle(tick) {
Ok(events) => events,
Err(err) => {
log::error!(
"{}: failed to fetch project data: {:#?}",
self.project_id,
err
);
vec![]
}
};
let three = match self.fetch_gitbutler_handler.handle(tick) {
Ok(events) => events,
Err(err) => {
log::error!(
"{}: failed to fetch gitbutler data: {:#?}",
self.project_id,
err
);
vec![]
}
};
Ok(one
.into_iter()
.chain(two.into_iter())
.chain(three.into_iter())
.collect())
}
events::Event::Flush(session) => self
.flush_session_handler
.handle(&session)
.context("failed to handle flush session event"),
events::Event::Fetch => self.fetch_project_handler.handle(),
events::Event::SessionFile((session_id, file_path, contents)) => {
let file_events = self