fetch remotes every 10 min

This commit is contained in:
Nikita Galaiko 2023-05-08 12:43:42 +02:00
parent 55fdb08b83
commit cea80b5cb1
9 changed files with 215 additions and 21 deletions

View File

@ -114,6 +114,77 @@ impl Repository {
&self.project_id &self.project_id
} }
pub fn fetch(&self) -> Result<bool> {
let user = self.users_store.get().context("failed to get user")?;
let project = self
.project_store
.get_project(&self.project_id)
.context("failed to get project")?
.ok_or(anyhow!("project not found"))?;
let project = project.as_ref();
// only push if logged in
let access_token = match user {
Some(user) => user.access_token.clone(),
None => return Ok(false),
};
// only push if project is connected
let remote_url = match project.api {
Some(ref api) => api.git_url.clone(),
None => return Ok(false),
};
let mut remote = self
.git_repository
.remote_anonymous(remote_url.as_str())
.with_context(|| {
format!(
"failed to create anonymous remote for {}",
remote_url.as_str()
)
})?;
let mut callbacks = git2::RemoteCallbacks::new();
callbacks.push_update_reference(move |refname, message| {
log::info!(
"{}: pulling reference '{}': {:?}",
project.id,
refname,
message
);
Result::Ok(())
});
callbacks.push_transfer_progress(move |one, two, three| {
log::info!(
"{}: transferred {}/{}/{} objects",
project.id,
one,
two,
three
);
});
let mut fetch_opts = git2::FetchOptions::new();
fetch_opts.remote_callbacks(callbacks);
let auth_header = format!("Authorization: {}", access_token);
let headers = &[auth_header.as_str()];
fetch_opts.custom_headers(headers);
remote
.fetch(
&["refs/heads/*:refs/remotes/*"],
Some(&mut fetch_opts),
None,
)
.with_context(|| format!("failed to pull from remote {}", remote_url.as_str()))?;
log::info!("{}: fetched from {}", project.path, remote_url.as_str());
Ok(true)
}
fn create_current_session( fn create_current_session(
&self, &self,
project_repository: &project_repository::Repository, project_repository: &project_repository::Repository,

View File

@ -14,12 +14,14 @@ pub struct ApiProject {
pub sync: bool, pub sync: bool,
} }
#[derive(Debug, Deserialize, Serialize, Clone)] #[derive(Debug, Deserialize, Serialize, Clone, Default)]
pub struct Project { pub struct Project {
pub id: String, pub id: String,
pub title: String, pub title: String,
pub path: String, pub path: String,
pub api: Option<ApiProject>, pub api: Option<ApiProject>,
#[serde(default)]
pub last_fetched_ts: Option<u128>,
} }
impl AsRef<Project> for Project { impl AsRef<Project> for Project {
@ -70,6 +72,7 @@ impl Project {
title, title,
path: path.to_str().unwrap().to_string(), path: path.to_str().unwrap().to_string(),
api: None, api: None,
..Default::default()
}; };
Ok(project) Ok(project)

View File

@ -15,6 +15,7 @@ pub struct UpdateRequest {
pub id: String, pub id: String,
pub title: Option<String>, pub title: Option<String>,
pub api: Option<project::ApiProject>, pub api: Option<project::ApiProject>,
pub last_fetched_ts: Option<u128>,
} }
impl Storage { impl Storage {
@ -54,6 +55,8 @@ impl Storage {
project.api = Some(api.clone()); project.api = Some(api.clone());
} }
project.last_fetched_ts = update_request.last_fetched_ts;
self.storage self.storage
.write(PROJECTS_FILE, &serde_json::to_string(&projects)?)?; .write(PROJECTS_FILE, &serde_json::to_string(&projects)?)?;

View File

@ -9,6 +9,7 @@ pub enum Event {
Tick(time::SystemTime), Tick(time::SystemTime),
FlushSession(sessions::Session), FlushSession(sessions::Session),
SessionFlushed(sessions::Session), SessionFlushed(sessions::Session),
FetchProject(projects::Project),
FileChange(path::PathBuf), FileChange(path::PathBuf),
GitFileChange(path::PathBuf), GitFileChange(path::PathBuf),

View File

@ -27,7 +27,7 @@ impl<'handler> Handler<'handler> {
{ {
None => Ok(vec![]), None => Ok(vec![]),
Some(current_session) => { Some(current_session) => {
if should_flush(now, &current_session) { if should_flush(now, &current_session)? {
Ok(vec![events::Event::FlushSession(current_session)]) Ok(vec![events::Event::FlushSession(current_session)])
} else { } else {
Ok(vec![]) Ok(vec![])
@ -37,22 +37,22 @@ impl<'handler> Handler<'handler> {
} }
} }
pub(super) fn should_flush(now: time::SystemTime, session: &sessions::Session) -> bool { pub(super) fn should_flush(now: time::SystemTime, session: &sessions::Session) -> Result<bool> {
!is_session_active(now, session) || is_session_too_old(now, session) Ok(!is_session_active(now, session)? || is_session_too_old(now, session)?)
} }
const ONE_HOUR: time::Duration = time::Duration::new(60 * 60, 0); const ONE_HOUR: time::Duration = time::Duration::new(60 * 60, 0);
fn is_session_too_old(now: time::SystemTime, session: &sessions::Session) -> bool { fn is_session_too_old(now: time::SystemTime, session: &sessions::Session) -> Result<bool> {
let session_start = time::UNIX_EPOCH let session_start =
+ time::Duration::from_millis(session.meta.start_timestamp_ms.try_into().unwrap()); time::UNIX_EPOCH + time::Duration::from_millis(session.meta.start_timestamp_ms.try_into()?);
session_start + ONE_HOUR < now Ok(session_start + ONE_HOUR < now)
} }
const FIVE_MINUTES: time::Duration = time::Duration::new(5 * 60, 0); const FIVE_MINUTES: time::Duration = time::Duration::new(5 * 60, 0);
fn is_session_active(now: time::SystemTime, session: &sessions::Session) -> bool { fn is_session_active(now: time::SystemTime, session: &sessions::Session) -> Result<bool> {
let session_last_update = time::UNIX_EPOCH let session_last_update =
+ time::Duration::from_millis(session.meta.last_timestamp_ms.try_into().unwrap()); time::UNIX_EPOCH + time::Duration::from_millis(session.meta.last_timestamp_ms.try_into()?);
session_last_update + FIVE_MINUTES > now Ok(session_last_update + FIVE_MINUTES > now)
} }

View File

@ -26,10 +26,10 @@ fn test_should_flush() -> Result<()> {
}, },
}; };
assert!(!should_flush(now, &session)); assert!(!should_flush(now, &session)?);
assert!(should_flush(start + FIVE_MINUTES, &session)); assert!(should_flush(start + FIVE_MINUTES, &session)?);
assert!(should_flush(last + ONE_HOUR, &session)); assert!(should_flush(last + ONE_HOUR, &session)?);
Ok(()) Ok(())
} }

View File

@ -0,0 +1,49 @@
use std::time;
use anyhow::{Context, Result};
use crate::app::projects;
use super::events;
pub struct Handler {
project_id: String,
project_storage: projects::Storage,
}
impl Handler {
pub fn new(project_id: String, project_storage: projects::Storage) -> Self {
Self {
project_id,
project_storage,
}
}
pub fn handle(&self, now: time::SystemTime) -> Result<Vec<events::Event>> {
match self
.project_storage
.get_project(&self.project_id)
.context("failed to get project")?
{
None => Ok(vec![]),
Some(project) => {
if should_fetch(now, &project)? {
Ok(vec![events::Event::FetchProject(project)])
} else {
Ok(vec![])
}
}
}
}
}
const TEN_MINUTES: time::Duration = time::Duration::new(10 * 60, 0);
pub(super) fn should_fetch(now: time::SystemTime, project: &projects::Project) -> Result<bool> {
if project.last_fetched_ts.is_none() {
return Ok(true);
}
let project_last_fetch = time::UNIX_EPOCH
+ time::Duration::from_millis(project.last_fetched_ts.unwrap().try_into()?);
Ok(project_last_fetch + TEN_MINUTES < now)
}

View File

@ -0,0 +1,46 @@
use std::time;
use anyhow::{Context, Result};
use crate::{app::gb_repository, projects};
use super::events;
pub struct Handler<'handler> {
project_storage: projects::Storage,
gb_repository: &'handler gb_repository::Repository,
}
impl<'listener> Handler<'listener> {
pub fn new(
project_storage: projects::Storage,
gb_repository: &'listener gb_repository::Repository,
) -> Self {
Self {
project_storage,
gb_repository,
}
}
pub fn handle(&self, project: &projects::Project) -> Result<Vec<events::Event>> {
if !self.gb_repository.fetch().context("failed to fetch")? {
return Ok(vec![]);
}
self.project_storage
.update_project(&projects::UpdateRequest {
id: project.id.clone(),
last_fetched_ts: Some(
time::SystemTime::now()
.duration_since(time::UNIX_EPOCH)
.context("failed to get time since epoch")?
.as_millis()
.try_into()
.context("failed to convert time to millis")?,
),
..Default::default()
})
.context("failed to update project")?;
Ok(vec![])
}
}

View File

@ -1,4 +1,6 @@
mod check_current_session; mod check_current_session;
mod check_fetch_project;
mod fetch_project;
mod file_change; mod file_change;
mod flush_session; mod flush_session;
mod git_file_change; mod git_file_change;
@ -25,6 +27,8 @@ pub struct Handler<'handler> {
git_file_change_handler: git_file_change::Handler, git_file_change_handler: git_file_change::Handler,
check_current_session_handler: check_current_session::Handler<'handler>, check_current_session_handler: check_current_session::Handler<'handler>,
flush_session_handler: flush_session::Handler<'handler>, flush_session_handler: flush_session::Handler<'handler>,
fetch_project_handler: fetch_project::Handler<'handler>,
chech_fetch_project_handler: check_fetch_project::Handler,
searcher: search::Deltas, searcher: search::Deltas,
events: sync::mpsc::Sender<app_events::Event>, events: sync::mpsc::Sender<app_events::Event>,
@ -53,10 +57,19 @@ impl<'handler> Handler<'handler> {
project_store.clone(), project_store.clone(),
), ),
flush_session_handler: flush_session::Handler::new( flush_session_handler: flush_session::Handler::new(
project_id, project_id.clone(),
project_store, project_store.clone(),
gb_repository, gb_repository,
), ),
fetch_project_handler: fetch_project::Handler::new(
project_store.clone(),
gb_repository,
),
chech_fetch_project_handler: check_fetch_project::Handler::new(
project_id,
project_store,
),
searcher, searcher,
events, events,
} }
@ -108,10 +121,17 @@ impl<'handler> Handler<'handler> {
.context("failed to send git index event")?; .context("failed to send git index event")?;
Ok(vec![]) Ok(vec![])
} }
events::Event::Tick(tick) => self events::Event::Tick(tick) => {
.check_current_session_handler let one = self
.handle(tick) .check_current_session_handler
.context("failed to handle tick event"), .handle(tick)
.context("failed to handle tick event")?;
let two = self
.chech_fetch_project_handler
.handle(tick)
.context("failed to handle tick event")?;
Ok(one.into_iter().chain(two.into_iter()).collect())
}
events::Event::FlushSession(session) => self events::Event::FlushSession(session) => self
.flush_session_handler .flush_session_handler
.handle(&session) .handle(&session)
@ -122,6 +142,7 @@ impl<'handler> Handler<'handler> {
.context("failed to index session")?; .context("failed to index session")?;
Ok(vec![]) Ok(vec![])
} }
events::Event::FetchProject(project) => self.fetch_project_handler.handle(&project),
} }
} }
} }