log project id in wathcers

This commit is contained in:
Nikita Galaiko 2023-03-09 15:34:52 +01:00
parent 7c98076032
commit 8ffcfdc3c9
No known key found for this signature in database
GPG Key ID: EBAB54E845BA519D
9 changed files with 262 additions and 206 deletions

View File

@ -19,12 +19,19 @@ pub fn read(project: &projects::Project, file_path: &Path) -> Result<Option<Vec<
let file_deltas = std::fs::read_to_string(&file_deltas_path).with_context(|| {
format!(
"Failed to read file deltas from {}",
"failed to read file deltas from {}",
file_deltas_path.to_str().unwrap()
)
})?;
Ok(Some(serde_json::from_str(&file_deltas)?))
let deltas: Vec<Delta> = serde_json::from_str(&file_deltas).with_context(|| {
format!(
"failed to parse file deltas from {}",
file_deltas_path.to_str().unwrap()
)
})?;
Ok(Some(deltas))
}
pub fn write(
@ -34,27 +41,32 @@ pub fn write(
deltas: &Vec<Delta>,
) -> Result<sessions::Session> {
// make sure we always have a session before writing deltas
let mut session = match sessions::Session::current(repo, project)? {
Some(session) => Ok(session),
let session = match sessions::Session::current(repo, project)? {
Some(mut session) => {
session
.touch(project)
.with_context(|| format!("failed to touch session {}", session.id))?;
Ok(session)
}
None => sessions::Session::from_head(repo, project),
}?;
let delta_path = project.deltas_path().join(file_path);
let delta_dir = delta_path.parent().unwrap();
std::fs::create_dir_all(&delta_dir)?;
log::info!("mkdir {}", delta_path.to_str().unwrap());
log::info!("Writing deltas to {}", delta_path.to_str().unwrap());
log::info!(
"{}: writing deltas to {}",
project.id,
delta_path.to_str().unwrap()
);
let raw_deltas = serde_json::to_string(&deltas)?;
std::fs::write(delta_path.clone(), raw_deltas).with_context(|| {
format!(
"Failed to write file deltas to {}",
"failed to write file deltas to {}",
delta_path.to_str().unwrap()
)
})?;
// update last session activity timestamp
session.update(project)?;
Ok(session)
}

View File

@ -18,23 +18,19 @@ fn apply_deltas(doc: &mut Vec<char>, deltas: &Vec<deltas::Delta>) -> Result<()>
}
impl TextDocument {
// creates a new text document from a deltas.
pub fn from_deltas(deltas: Vec<deltas::Delta>) -> Result<TextDocument> {
let mut doc = vec![];
apply_deltas(&mut doc, &deltas)?;
Ok(TextDocument { doc, deltas })
}
pub fn get_deltas(&self) -> Vec<deltas::Delta> {
self.deltas.clone()
}
// returns a text document where internal state is seeded with value, and deltas are applied.
pub fn new(value: &str, deltas: Vec<deltas::Delta>) -> Result<TextDocument> {
let mut all_deltas = vec![deltas::Delta {
pub fn new(value: Option<&str>, deltas: Vec<deltas::Delta>) -> Result<TextDocument> {
let mut all_deltas = vec![];
if let Some(value) = value {
all_deltas.push(deltas::Delta {
operations: operations::get_delta_operations("", value),
timestamp_ms: 0,
}];
});
}
all_deltas.append(&mut deltas.clone());
let mut doc = vec![];
apply_deltas(&mut doc, &all_deltas)?;

View File

@ -2,7 +2,7 @@ use crate::deltas::{operations::Operation, text_document::TextDocument, Delta};
#[test]
fn test_new() {
let document = TextDocument::new("hello world", vec![]);
let document = TextDocument::new(Some("hello world"), vec![]);
assert_eq!(document.is_ok(), true);
let document = document.unwrap();
assert_eq!(document.to_string(), "hello world");
@ -11,7 +11,7 @@ fn test_new() {
#[test]
fn test_update() {
let document = TextDocument::new("hello world", vec![]);
let document = TextDocument::new(Some("hello world"), vec![]);
assert_eq!(document.is_ok(), true);
let mut document = document.unwrap();
document.update("hello world!").unwrap();
@ -26,7 +26,7 @@ fn test_update() {
#[test]
fn test_empty() {
let document = TextDocument::from_deltas(vec![]);
let document = TextDocument::new(None, vec![]);
assert_eq!(document.is_ok(), true);
let mut document = document.unwrap();
document.update("hello world!").unwrap();
@ -41,7 +41,9 @@ fn test_empty() {
#[test]
fn test_from_deltas() {
let document = TextDocument::from_deltas(vec![
let document = TextDocument::new(
None,
vec![
Delta {
timestamp_ms: 0,
operations: vec![Operation::Insert((0, "hello".to_string()))],
@ -57,7 +59,8 @@ fn test_from_deltas() {
Operation::Insert((4, "!".to_string())),
],
},
]);
],
);
assert_eq!(document.is_ok(), true);
let document = document.unwrap();
assert_eq!(document.to_string(), "held!");
@ -65,7 +68,7 @@ fn test_from_deltas() {
#[test]
fn test_complex_line() {
let document = TextDocument::from_deltas(vec![]);
let document = TextDocument::new(None, vec![]);
assert_eq!(document.is_ok(), true);
let mut document = document.unwrap();
@ -103,7 +106,7 @@ fn test_complex_line() {
#[test]
fn test_multiline_add() {
let document = TextDocument::from_deltas(vec![]);
let document = TextDocument::new(None, vec![]);
assert_eq!(document.is_ok(), true);
let mut document = document.unwrap();
@ -141,7 +144,7 @@ fn test_multiline_add() {
#[test]
fn test_multiline_remove() {
let document = TextDocument::from_deltas(vec![]);
let document = TextDocument::new(None, vec![]);
assert_eq!(document.is_ok(), true);
let mut document = document.unwrap();

View File

@ -1,6 +1,6 @@
use crate::projects::project;
use crate::storage;
use anyhow::Result;
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
const PROJECTS_FILE: &str = "projects.json";
@ -26,7 +26,8 @@ impl Storage {
pub fn list_projects(&self) -> Result<Vec<project::Project>> {
match self.storage.read(PROJECTS_FILE)? {
Some(projects) => {
let all_projects: Vec<project::Project> = serde_json::from_str(&projects)?;
let all_projects: Vec<project::Project> = serde_json::from_str(&projects)
.with_context(|| format!("Failed to parse projects from {}", PROJECTS_FILE))?;
let non_deleted_projects = all_projects
.into_iter()
.filter(|p: &project::Project| !p.deleted)

View File

@ -132,15 +132,12 @@ fn init(
Some(session) => session,
None => sessions::Session::from_head(git_repository, project)?,
};
current_session.flush(git_repository, user, project)?;
current_session
.flush(git_repository, user, project)
.with_context(|| format!("{}: failed to flush session", project.id))?;
Ok(())
} else {
Err(error).with_context(|| {
format!(
"failed to find reference {} in repository {}",
reference_name, project.path
)
})
Err(error.into())
}
}
}

View File

@ -154,7 +154,7 @@ impl Session {
meta,
activity,
};
create(project, &session)?;
create(project, &session).with_context(|| "failed to create current session from head")?;
Ok(session)
}
@ -239,7 +239,7 @@ impl Session {
})
}
pub fn update(&mut self, project: &projects::Project) -> Result<()> {
pub fn touch(&mut self, project: &projects::Project) -> Result<()> {
update(project, self)
}
@ -325,11 +325,11 @@ fn update(project: &projects::Project, session: &mut Session) -> Result<()> {
.as_millis();
let session_path = project.session_path();
log::debug!("{}: Updating current session", session_path.display());
log::debug!("{}: updating current session", project.id);
if session_path.exists() {
write(&session_path, session)
} else {
Err(anyhow!("session does not exist"))
Err(anyhow!("\"{}\" does not exist", session_path.display()))
}
}
@ -346,7 +346,7 @@ fn create(project: &projects::Project, session: &Session) -> Result<()> {
fn delete(project: &projects::Project) -> Result<()> {
let session_path = project.session_path();
log::debug!("{}: Deleting current session", session_path.display());
log::debug!("{}: deleting current session", project.id);
if session_path.exists() {
std::fs::remove_dir_all(session_path)?;
}
@ -537,8 +537,8 @@ fn flush(
}
session
.update(project)
.with_context(|| format!("failed to update session"))?;
.touch(project)
.with_context(|| format!("failed to touch session"))?;
let wd_index = &mut git2::Index::new()
.with_context(|| format!("failed to create index for working directory"))?;
@ -586,17 +586,24 @@ fn flush(
)
})?;
log::debug!(
"{}: wrote gb commit {}",
repo.workdir().unwrap().display(),
commit_oid
log::info!(
"{}: flushed session {} into commit {}",
project.id,
session.id,
commit_oid,
);
session.hash = Some(commit_oid.to_string());
delete(project).with_context(|| format!("failed to delete session"))?;
delete(project)?;
if let Err(e) = push_to_remote(repo, user, project) {
log::error!("failed to push gb commit {} to remote: {:#}", commit_oid, e);
log::error!(
"{}: failed to push gb commit {} to remote: {:#}",
project.id,
commit_oid,
e
);
}
Ok(())
@ -679,10 +686,18 @@ fn push_to_remote(
// it ignores files that are in the .gitignore
fn build_wd_index(repo: &git2::Repository, index: &mut git2::Index) -> Result<()> {
// create a new in-memory git2 index and open the working one so we can cheat if none of the metadata of an entry has changed
let repo_index = &mut repo.index()?;
let repo_index = &mut repo
.index()
.with_context(|| format!("failed to open repo index"))?;
// add all files in the working directory to the in-memory index, skipping for matching entries in the repo index
let all_files = fs::list_files(repo.workdir().unwrap())?;
let all_files = fs::list_files(repo.workdir().unwrap()).with_context(|| {
format!(
"failed to list files in {}",
repo.workdir().unwrap().display()
)
})?;
for file in all_files {
let file_path = Path::new(&file);
if !repo.is_path_ignored(&file).unwrap_or(true) {
@ -724,7 +739,7 @@ fn add_wd_path(
&& entry.file_size == u32::try_from(metadata.len())?
&& entry.mode == metadata.mode()
{
log::debug!("Using existing entry for {}", file_path.display());
log::debug!("using existing entry for {}", file_path.display());
index.add(&entry).unwrap();
return Ok(());
}
@ -732,7 +747,7 @@ fn add_wd_path(
// something is different, or not found, so we need to create a new entry
log::debug!("Adding wd path: {}", file_path.display());
log::debug!("adding wd path: {}", file_path.display());
// look for files that are bigger than 4GB, which are not supported by git
// insert a pointer as the blob content instead
@ -770,7 +785,8 @@ fn add_wd_path(
};
// create a new IndexEntry from the file metadata
match index.add(&git2::IndexEntry {
index
.add(&git2::IndexEntry {
ctime: git2::IndexTime::new(
ctime.seconds().try_into()?,
ctime.nanoseconds().try_into().unwrap(),
@ -789,10 +805,10 @@ fn add_wd_path(
flags_extended: 0, // no extended flags
path: rel_file_path.to_str().unwrap().to_string().into(),
id: blob,
}) {
Ok(_) => Ok(()),
Err(e) => Err(e).with_context(|| "failed to add working directory path".to_string()),
}
})
.with_context(|| format!("failed to add index entry for {}", file_path.display()))?;
Ok(())
}
/// calculates sha256 digest of a large file as lowercase hex string via streaming buffer
@ -874,12 +890,8 @@ fn build_session_index(
let session_dir = project.session_path();
for session_file in fs::list_files(&session_dir)? {
let file_path = Path::new(&session_file);
add_session_path(&repo, index, project, &file_path).with_context(|| {
format!(
"Failed to add session file to index: {}",
file_path.display()
)
})?;
add_session_path(&repo, index, project, &file_path)
.with_context(|| format!("failed to add session file: {}", file_path.display()))?;
}
Ok(())
@ -894,7 +906,7 @@ fn add_session_path(
) -> Result<()> {
let file_path = project.session_path().join(rel_file_path);
log::debug!("Adding session path: {}", file_path.display());
log::debug!("adding session path: {}", file_path.display());
let blob = repo.blob_path(&file_path)?;
let metadata = file_path.metadata()?;
@ -902,7 +914,8 @@ fn add_session_path(
let ctime = FileTime::from_creation_time(&metadata).unwrap_or(mtime);
// create a new IndexEntry from the file metadata
index.add(&git2::IndexEntry {
index
.add(&git2::IndexEntry {
ctime: git2::IndexTime::new(
ctime.seconds().try_into()?,
ctime.nanoseconds().try_into().unwrap(),
@ -921,6 +934,12 @@ fn add_session_path(
flags_extended: 0, // no extended flags
path: rel_file_path.to_str().unwrap().into(),
id: blob,
})
.with_context(|| {
format!(
"Failed to add session file to index: {}",
file_path.display()
)
})?;
Ok(())
@ -958,7 +977,8 @@ fn write_gb_commit(
)?;
Ok(new_commit)
}
Err(_) => {
Err(e) => {
if e.code() == git2::ErrorCode::NotFound {
let new_commit = repo.commit(
Some(refname.as_str()),
&author, // author
@ -968,6 +988,9 @@ fn write_gb_commit(
&[], // parents
)?;
Ok(new_commit)
} else {
return Err(e.into());
}
}
}
}

View File

@ -61,7 +61,11 @@ impl DeltaWatchers {
let relative_file_path =
file_path.strip_prefix(project.path.clone()).unwrap();
let repo = git2::Repository::open(&project.path).expect(
format!("failed to open repo at {}", project.path).as_str(),
format!(
"{}: failed to open repo at \"{}\"",
project.id, project.path
)
.as_str(),
);
if repo.is_path_ignored(&relative_file_path).unwrap_or(true) {
@ -85,7 +89,11 @@ impl DeltaWatchers {
if let Err(e) =
sender.send(events::Event::session(&project, &session))
{
log::error!("filed to send session event: {:#}", e)
log::error!(
"{}: failed to send session event: {:#}",
project.id,
e
)
}
if let Err(e) = sender.send(events::Event::detlas(
@ -94,15 +102,23 @@ impl DeltaWatchers {
&deltas,
&relative_file_path,
)) {
log::error!("failed to send deltas event: {:#}", e)
log::error!(
"{}: failed to send deltas event: {:#}",
project.id,
e
)
}
}
Ok(None) => {}
Err(e) => log::error!("failed to register file change: {:#}", e),
Err(e) => log::error!(
"{}: failed to register file change: {:#}",
project.id,
e
),
}
}
}
Err(e) => log::error!("notify event error: {:#}", e),
Err(e) => log::error!("{}: notify event error: {:#}", project.id, e),
}
}
});
@ -120,8 +136,7 @@ impl DeltaWatchers {
// this is what is called when the FS watcher detects a change
// it should figure out delta data (crdt) and update the file at .git/gb/session/deltas/path/to/file
// it also writes the metadata stuff which marks the beginning of a session if a session is not yet started
// returns updated project deltas and sessions to which they belong
// returns current project session and calculated deltas, if any.
pub(crate) fn register_file_change(
project: &projects::Project,
repo: &git2::Repository,
@ -137,17 +152,21 @@ pub(crate) fn register_file_change(
} else {
// file exists, but content is not utf-8, it's a noop
// TODO: support binary files
log::info!("File is not utf-8, ignoring: {:?}", file_path);
log::info!(
"{}: \"{}\" is not utf-8, ignoring",
project.id,
file_path.display()
);
return Ok(None);
}
}
};
// first, we need to check if the file exists in the meta commit
// first, get latest file contens to compare with
let latest_contents = get_latest_file_contents(&repo, project, relative_file_path)
.with_context(|| {
format!(
"Failed to get latest file contents for {}",
"failed to get latest file contents for {}",
relative_file_path.display()
)
})?;
@ -155,32 +174,32 @@ pub(crate) fn register_file_change(
// second, get non-flushed file deltas
let deltas = read(project, relative_file_path).with_context(|| {
format!(
"Failed to get current file deltas for {}",
"failed to get current file deltas for {}",
relative_file_path.display()
)
})?;
// depending on the above, we can create TextDocument suitable for calculating deltas
// depending on the above, we can create TextDocument suitable for calculating _new_ deltas
let mut text_doc = match (latest_contents, deltas) {
(Some(latest_contents), Some(deltas)) => TextDocument::new(&latest_contents, deltas)?,
(Some(latest_contents), None) => TextDocument::new(&latest_contents, vec![])?,
(None, Some(deltas)) => TextDocument::from_deltas(deltas)?,
(None, None) => TextDocument::from_deltas(vec![])?,
(Some(latest_contents), Some(deltas)) => TextDocument::new(Some(&latest_contents), deltas)?,
(Some(latest_contents), None) => TextDocument::new(Some(&latest_contents), vec![])?,
(None, Some(deltas)) => TextDocument::new(None, deltas)?,
(None, None) => TextDocument::new(None, vec![])?,
};
if !text_doc.update(&file_contents)? {
return Ok(None);
} else {
}
// if the file was modified, save the deltas
let deltas = text_doc.get_deltas();
let session = write(&repo, project, relative_file_path, &deltas)?;
Ok(Some((session, deltas)))
}
}
// returns last commited file contents from refs/gitbutler/current ref
// if it doesn't exists, fallsback to HEAD
// returns None if file doesn't exist in HEAD
// if ref doesn't exists, returns file contents from the HEAD repository commit
// returns None if file is not found in either of trees
// returns None if file is not UTF-8
// TODO: handle binary files
fn get_latest_file_contents(
@ -190,17 +209,14 @@ fn get_latest_file_contents(
) -> Result<Option<String>> {
let tree_entry = match repo.find_reference(&project.refname()) {
Ok(reference) => {
let gitbutler_tree = reference.peel_to_tree()?;
let gitbutler_tree_path = &Path::new("wd").join(relative_file_path);
let tree_entry = gitbutler_tree.get_path(gitbutler_tree_path);
tree_entry
// "wd/<file_path>" contents from gitbutler HEAD
reference.peel_to_tree()?.get_path(gitbutler_tree_path)
}
Err(e) => {
if e.code() == git2::ErrorCode::NotFound {
let head = repo.head()?;
let tree = head.peel_to_tree()?;
let tree_entry = tree.get_path(relative_file_path);
tree_entry
// "<file_path>" contents from repository HEAD
repo.head()?.peel_to_tree()?.get_path(relative_file_path)
} else {
Err(e)
}
@ -208,28 +224,34 @@ fn get_latest_file_contents(
};
match tree_entry {
Ok(tree_entry) => {
// if file found, check if delta file exists
let blob = tree_entry.to_object(&repo)?.into_blob().unwrap();
// parse blob as utf-8.
// if it's not utf8, return None
let contents = match String::from_utf8(blob.content().to_vec()) {
Ok(contents) => Some(contents),
Err(_) => {
log::info!("File is not utf-8, ignoring: {:?}", relative_file_path);
None
}
};
Ok(contents)
}
Err(e) => {
if e.code() == git2::ErrorCode::NotFound {
// file not found, return None
// file not found in the chosen tree, return None
Ok(None)
} else {
Err(e.into())
}
}
Ok(tree_entry) => {
let blob = tree_entry.to_object(&repo)?.into_blob().expect(&format!(
"{}: failed to get blob for {}",
project.id,
relative_file_path.display()
));
let text_content = match String::from_utf8(blob.content().to_vec()) {
Ok(contents) => Some(contents),
Err(_) => {
log::info!(
"{}: \"{}\" is not utf-8, ignoring",
project.id,
relative_file_path.display()
);
None
}
};
Ok(text_content)
}
}
}

View File

@ -30,6 +30,7 @@ fn test_register_file_change_must_create_session() {
std::fs::write(Path::new(&project.path).join(relative_file_path), "test").unwrap();
let result = super::delta::register_file_change(&project, &repo, &relative_file_path);
println!("{:?}", result);
assert!(result.is_ok());
let maybe_session_deltas = result.unwrap();
assert!(maybe_session_deltas.is_some());

View File

@ -34,35 +34,27 @@ impl<'a> SessionWatcher {
match self
.projects_storage
.get_project(&project_id)
.with_context(|| {
format!("Error while getting project {} for git watcher", project_id)
})? {
.with_context(|| format!("{}: failed to get project", project_id))?
{
Some(project) => {
let user = self.users_storage.get().with_context(|| {
format!(
"Error while getting user for git watcher in {}",
project.path
)
})?;
let user = self
.users_storage
.get()
.with_context(|| format!("{}: failed to get user", project.id))?;
match self.check_for_changes(&project, &user)? {
Some(session) => {
sender
.send(events::Event::session(&project, &session))
.with_context(|| {
format!(
"Error while sending session event for git watcher in {}",
project.path
)
format!("{}: failed to send session event", project.id)
})?;
Ok(())
}
None => Ok(()),
}
}
None => {
log::error!("Project {} not found for git watcher", project_id);
Ok(())
}
None => Err(anyhow::anyhow!("project not found")),
}
}
@ -71,7 +63,7 @@ impl<'a> SessionWatcher {
sender: mpsc::Sender<events::Event>,
project: projects::Project,
) -> Result<()> {
log::info!("Watching sessions for {}", project.path);
log::info!("{}: watching sessions in {}", project.id, project.path);
let shared_self = self.clone();
let mut self_copy = shared_self.clone();
@ -80,7 +72,7 @@ impl<'a> SessionWatcher {
tauri::async_runtime::spawn_blocking(move || loop {
let local_self = &mut self_copy;
if let Err(e) = local_self.run(&project_id, sender.clone()) {
log::error!("Error while running git watcher: {:#}", e);
log::error!("{}: error while running git watcher: {:#}", project_id, e);
}
thread::sleep(Duration::from_secs(10));
});
@ -101,18 +93,19 @@ impl<'a> SessionWatcher {
project: &projects::Project,
user: &Option<users::User>,
) -> Result<Option<sessions::Session>> {
let repo = git2::Repository::open(project.path.clone())?;
let repo = git2::Repository::open(project.path.clone())
.with_context(|| format!("{}: failed to open repository", project.id))?;
match session_to_commit(&repo, project)
.with_context(|| "Error while checking for session to commit")?
.with_context(|| "failed to check for session to comit")?
{
None => Ok(None),
Some(mut session) => {
session
.flush(&repo, user, project)
.with_context(|| "Error while flushing session")?;
.with_context(|| format!("failed to flush session {}", session.id))?;
self.deltas_searcher
.index_session(&repo, &project, &session)
.with_context(|| format!("Error while indexing session {}", session.id))?;
.with_context(|| format!("failed to index session {}", session.id))?;
Ok(Some(session))
}
}
@ -126,10 +119,10 @@ fn session_to_commit(
repo: &Repository,
project: &projects::Project,
) -> Result<Option<sessions::Session>> {
match sessions::Session::current(repo, project)? {
None => {
Ok(None)
}
match sessions::Session::current(repo, project)
.with_context(|| format!("{}: failed to get current session", project.id))?
{
None => Ok(None),
Some(current_session) => {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
@ -140,11 +133,19 @@ fn session_to_commit(
let elapsed_start = now - current_session.meta.start_timestamp_ms;
if (elapsed_last > FIVE_MINUTES) || (elapsed_start > ONE_HOUR) {
log::info!(
"{}: ready to commit {} ({} seconds elapsed, {} seconds since start)",
project.id,
project.path,
elapsed_last / 1000,
elapsed_start / 1000
);
Ok(Some(current_session))
} else {
log::debug!(
"Not ready to commit {} yet. ({} seconds elapsed, {} seconds since start)",
repo.workdir().unwrap().display(),
"{}: not ready to commit {} yet. ({} seconds elapsed, {} seconds since start)",
project.id,
project.path,
elapsed_last / 1000,
elapsed_start / 1000
);