Merge pull request #150 from SamSchott/path-improvements

Path improvements
This commit is contained in:
SamSchott 2020-05-25 13:41:08 +02:00 committed by GitHub
commit 1b016ed6b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 217 additions and 72 deletions

View File

@ -13,7 +13,6 @@ be kept free of memory heavy imports.
import os
import platform
import sys
import tempfile
from enum import Enum

View File

@ -60,10 +60,10 @@ from maestral.errors import (
from maestral.utils.content_hasher import DropboxContentHasher
from maestral.utils.notify import MaestralDesktopNotifier, FILECHANGE
from maestral.utils.path import (
generate_cc_name, path_exists_case_insensitive, to_cased_path, is_fs_case_sensitive,
generate_cc_name, cased_path_candidates, to_cased_path, is_fs_case_sensitive,
move, delete, is_child, is_equal_or_child
)
from maestral.utils.appdirs import get_data_path
from maestral.utils.appdirs import get_data_path, get_home_dir
logger = logging.getLogger(__name__)
@ -475,7 +475,8 @@ class SyncEngine:
self._mignore_path = osp.join(self._dropbox_path, MIGNORE_FILE)
self._file_cache_path = osp.join(self._dropbox_path, FILE_CACHE)
self._rev_file_path = get_data_path('maestral', f'{self.config_name}.index')
self._is_case_sensitive = is_fs_case_sensitive(self._dropbox_path)
# check for home, update later
self._is_case_sensitive = is_fs_case_sensitive(get_home_dir())
self._rev_dict_cache = dict()
self._load_rev_dict_from_file(raise_exception=True)
@ -951,11 +952,9 @@ class SyncEngine:
dbx_path = dbx_path.replace('/', osp.sep)
dbx_path_parent, dbx_path_basename = osp.split(dbx_path)
local_parent = to_cased_path(dbx_path_parent, root=self.dropbox_path)
local_parent = to_cased_path(dbx_path_parent, root=self.dropbox_path,
is_fs_case_sensitive=self.is_case_sensitive)
if local_parent == '':
return osp.join(self.dropbox_path, dbx_path.lstrip(osp.sep))
else:
return osp.join(local_parent, dbx_path_basename)
def get_local_path(self, md):
@ -1538,11 +1537,11 @@ class SyncEngine:
case. Renames items if necessary. Only needed for case sensitive file systems.
:param FileSystemEvent event: Created or moved event.
:returns: ``True`` or ``False``.
:returns: Whether a case conflict was detected and handled.
:rtype: bool
"""
if not self._is_case_sensitive:
if not self.is_case_sensitive:
return False
if event.event_type not in (EVENT_TYPE_CREATED, EVENT_TYPE_MOVED):
@ -1553,9 +1552,10 @@ class SyncEngine:
dirname, basename = osp.split(local_path)
# check number of paths with the same case
if len(path_exists_case_insensitive(basename, root=dirname)) > 1:
if len(cased_path_candidates(basename, root=dirname)) > 1:
local_path_cc = generate_cc_name(local_path, suffix='case conflict')
local_path_cc = generate_cc_name(local_path, suffix='case conflict',
is_fs_case_sensitive=self.is_case_sensitive)
event_cls = DirMovedEvent if osp.isdir(local_path) else FileMovedEvent
with self.fs_events.ignore(event_cls(local_path, local_path_cc)):
@ -1589,7 +1589,8 @@ class SyncEngine:
if self.is_excluded_by_user(dbx_path):
local_path_cc = generate_cc_name(local_path,
suffix='selective sync conflict')
suffix='selective sync conflict',
is_fs_case_sensitive=self.is_case_sensitive)
event_cls = DirMovedEvent if osp.isdir(local_path) else FileMovedEvent
with self.fs_events.ignore(event_cls(local_path, local_path_cc)):
@ -2502,7 +2503,9 @@ class SyncEngine:
# re-check for conflict and move the conflict
# out of the way if anything has changed
if self._check_download_conflict(entry) == Conflict.Conflict:
new_local_path = generate_cc_name(local_path)
new_local_path = generate_cc_name(
local_path, is_fs_case_sensitive=self.is_case_sensitive
)
event_cls = DirMovedEvent if osp.isdir(local_path) else FileMovedEvent
with self.fs_events.ignore(event_cls(local_path, new_local_path)):
exc = move(local_path, new_local_path)
@ -2539,7 +2542,9 @@ class SyncEngine:
# replace it but leave the children as they are.
if conflict_check == Conflict.Conflict:
new_local_path = generate_cc_name(local_path)
new_local_path = generate_cc_name(
local_path, is_fs_case_sensitive=self.is_case_sensitive
)
event_cls = DirMovedEvent if osp.isdir(local_path) else FileMovedEvent
with self.fs_events.ignore(event_cls(local_path, new_local_path)):
exc = move(local_path, new_local_path)

View File

@ -12,13 +12,32 @@ This module contains functions for common path operations used by Maestral.
import os
import os.path as osp
import shutil
import tempfile
import itertools
def _path_components(path):
components = path.strip(osp.sep).split(osp.sep)
cleaned_components = [c for c in components if c]
return cleaned_components
def is_fs_case_sensitive(path):
# create a cased temp file and check if the lower case version exists
with tempfile.NamedTemporaryFile(dir=path, prefix='.TmP') as tmp_file:
return not os.path.exists(tmp_file.name.lower())
"""
Checks if ``path`` lies on a partition with a case-sensitive file system.
:param str path: Path to check.
:returns: Whether ``path`` lies on a partition with a case-sensitive file system.
:rtype: bool
"""
if path.islower():
check_path = path.upper()
else:
check_path = path.lower()
if osp.exists(path) and not osp.exists(check_path):
return True
else:
return not osp.samefile(path, check_path)
def is_child(path, parent):
@ -28,8 +47,7 @@ def is_child(path, parent):
:param str path: Item path.
:param str parent: Parent path.
:returns: ``True`` if ``path`` semantically lies inside ``parent`` or
``path == parent``, ``False`` otherwise.
:returns: Whether ``path`` semantically lies inside ``parent``.
:rtype: bool
"""
@ -54,66 +72,140 @@ def is_equal_or_child(path, parent):
return is_child(path, parent) or path == parent
def path_exists_case_insensitive(path, root='/'):
def cased_path_candidates(path, root=osp.sep, is_fs_case_sensitive=True):
"""
Checks if a ``path`` exists in given ``root`` directory, similar to ``os.path.exists``
but case-insensitive. A list of all case-insensitive matches is returned.
Returns a list of cased versions of the given path as far as corresponding nodes
exist in the given root directory. For instance, if a case sensitive root directory
contains two folders "/parent/subfolder/child" and "/parent/Subfolder/child",
there will be two matches for "/parent/subfolder/child/file.txt". If the root
directory does not exist, only one candidate ``os.path.join(root, path)`` is returned.
:param str path: Path relative to ``root``.
:param str root: Directory where we will look for ``path``. There are significant
:param str path: Original path relative to ``root``.
:param str root: Parent directory to search in. There are significant
performance improvements if a root directory with a small tree is given.
:return: List of absolute and case-sensitive to search results.
:param bool is_fs_case_sensitive: Bool indicating if the file system is case
sensitive. If ``False``, we know that there can be at most one match and choose
a faster algorithm.
:returns: Candidates for correctly cased local paths.
:rtype: list[str]
"""
if not osp.isdir(root):
return []
path = path.lstrip(osp.sep)
if path in ('', '/'):
if path == '':
return [root]
path_list = path.lstrip(osp.sep).split(osp.sep)
path_list_lower = [x.lower() for x in path_list]
path_list = _path_components(path)
n_components = len(path_list)
n_components_root = len(_path_components(root))
candidates = {-1: [root]}
i = 0
local_paths = []
for root, dirs, files in os.walk(root):
for d in list(dirs):
if d.lower() != path_list_lower[i]:
dirs.remove(d)
for f in list(files):
if f.lower() != path_list_lower[i]:
files.remove(f)
local_paths = [osp.join(root, name) for name in dirs + files]
n_components_current_root = len(_path_components(root))
depth = n_components_current_root - n_components_root
i += 1
if i == len(path_list_lower):
all_dirs = dirs.copy()
all_files = files.copy()
dirs.clear()
files.clear()
if depth >= n_components:
if is_fs_case_sensitive:
continue
else:
break
found = False
path_lower = path_list[depth].lower()
for d in all_dirs:
if d.lower() == path_lower:
dirs.append(d)
if not is_fs_case_sensitive:
# skip to next iteration since there can be no more matches
found = True
break
if depth + 1 == n_components and not found:
# look at files
for f in all_files:
if f.lower() == path_lower:
files.append(f)
if not is_fs_case_sensitive:
# skip to next iteration since there can be no more matches
break
new_candidates = [osp.join(root, name) for name in itertools.chain(dirs, files)]
if new_candidates:
try:
candidates[depth].extend(new_candidates)
except KeyError:
candidates[depth] = new_candidates
i_max = max(candidates.keys())
local_paths = [osp.join(node, *path_list[i_max + 1:]) for node in candidates[i_max]]
return local_paths
def to_cased_path(path, root='/'):
def to_cased_path(path, root=osp.sep, is_fs_case_sensitive=True):
"""
Returns a cased version of the given path, if exists in the given root directory,
or an empty string otherwise.
Returns a cased version of the given path as far as corresponding nodes exist in the
given root directory. If multiple matches are found, only one is returned. If ``path``
does not exist in root ``root`` or ``root`` does not exist, the return value will be
``os.path.join(root, path)``.
:param str path: Original path.
:param str root: Parent directory to search in.
:returns: Absolute and cased version of given path or empty string.
:param str path: Original path relative to ``root``.
:param str root: Parent directory to search in. There are significant
performance improvements if a root directory with a small tree is given.
:param bool is_fs_case_sensitive: Bool indicating if the file system is case
sensitive. If ``False``, we know that there can be at most one match and choose
a faster algorithm.
:returns: Candidates for c
:returns: Absolute and cased version of given path.
:rtype: str
"""
path_list = path_exists_case_insensitive(path, root)
candidates = cased_path_candidates(path, root, is_fs_case_sensitive)
return candidates[0]
def path_exists_case_insensitive(path, root=osp.sep, is_fs_case_sensitive=True):
"""
Checks if a ``path`` exists in given ``root`` directory, similar to ``os.path.exists``
but case-insensitive.
:param str path: Path relative to ``root``.
:param str root: Directory where we will look for ``path``. There are significant
performance improvements if a root directory with a small tree is given.
:param bool is_fs_case_sensitive: Bool indicating if the file system is case
sensitive. If ``False``, we know that there can be at most one match and choose
a faster algorithm.
:returns: Whether an arbitrarily cased version of ``path`` exists.
:rtype: bool
"""
if is_fs_case_sensitive:
candidates = cased_path_candidates(path, root, is_fs_case_sensitive)
for c in candidates:
if osp.exists(c):
return True
return False
if len(path_list) > 0:
return path_list[0]
else:
return ''
return osp.exists(osp.join(root, path.lstrip(osp.sep)))
def generate_cc_name(path, suffix='conflicting copy'):
def generate_cc_name(path, suffix='conflicting copy', is_fs_case_sensitive=True):
"""
Generates a path for a conflicting copy of ``path``. The file name is created by
inserting the given ``suffix`` between the the filename and extension. For instance:
@ -127,6 +219,9 @@ def generate_cc_name(path, suffix='conflicting copy'):
:param str path: Original path name.
:param str suffix: Suffix to use. Defaults to 'conflicting copy'.
:param bool is_fs_case_sensitive: Bool indicating if the file system is case
sensitive. If ``False``, we know that there can be at most one match and choose
a faster algorithm.
:returns: New path.
:rtype: str
"""
@ -137,7 +232,7 @@ def generate_cc_name(path, suffix='conflicting copy'):
i = 0
cc_candidate = f'{filename} ({suffix}){ext}'
while path_exists_case_insensitive(cc_candidate, dirname):
while path_exists_case_insensitive(cc_candidate, dirname, is_fs_case_sensitive):
i += 1
cc_candidate = f'{filename} ({suffix} {i}){ext}'

View File

@ -18,13 +18,12 @@ from maestral.sync import (
DirCreatedEvent, DirDeletedEvent, DirMovedEvent,
)
from maestral.sync import delete, move
from maestral.sync import is_child
from maestral.sync import is_child, is_fs_case_sensitive
from maestral.sync import get_local_hash, DirectorySnapshot
from maestral.sync import SyncEngine, Observer, FSEventHandler
from maestral.errors import NotFoundError, FolderConflictError
from maestral.main import Maestral
from maestral.main import get_log_path
from maestral.constants import IS_FS_CASE_SENSITIVE
import unittest
from unittest import TestCase
@ -983,7 +982,7 @@ class TestSync(TestCase):
self.assertIsNotNone(self.m.client.get_metadata(self.test_folder_dbx + '/folder (selective sync conflict)'))
self.assertIsNotNone(self.m.client.get_metadata(self.test_folder_dbx + '/folder (selective sync conflict 1)'))
@unittest.skipUnless(IS_FS_CASE_SENSITIVE, 'file system is not case sensitive')
@unittest.skipUnless(is_fs_case_sensitive('/home'), 'file system is not case sensitive')
def test_case_conflict(self):
os.mkdir(self.test_folder_local + '/folder')

View File

@ -6,10 +6,13 @@
"""
import os
import os.path as osp
import tempfile
from maestral.utils.path import (
path_exists_case_insensitive, to_cased_path, is_child, delete
path_exists_case_insensitive, cased_path_candidates, to_cased_path,
is_fs_case_sensitive, is_child, delete
)
from maestral.utils.appdirs import get_home_dir
def test_path_exists_case_insensitive():
@ -21,16 +24,60 @@ def test_path_exists_case_insensitive():
assert to_cased_path(path.upper()) == path
# choose a random path that likely does not exist
root = '/'
path = '/usr/local/share/test_folder/path_928'
if not os.path.exists(path):
assert len(path_exists_case_insensitive(path, root)) == 0
if not osp.exists(path):
assert not path_exists_case_insensitive(path)
# choose a random parent that likely does not exist
root = '/test_folder/path_928'
path = '/usr'
if not os.path.exists(root):
assert len(path_exists_case_insensitive(path, root)) == 0
path = '/test_folder/path_928'
root = '/usr'
if not osp.exists(root):
assert not path_exists_case_insensitive(path, root)
def test_cased_path_candidates():
# choose a path which exists on all Unix systems
path = '/usr/local/share'.upper()
candidates = cased_path_candidates(path)
assert len(candidates) == 1
assert '/usr/local/share' in candidates
candidates = cased_path_candidates('/test', root='/usr/local/share')
assert len(candidates) == 1
assert '/usr/local/share/test' in candidates
home = get_home_dir()
if is_fs_case_sensitive(home):
parent0 = osp.join(home, 'test folder/subfolder')
parent1 = osp.join(home, 'Test Folder/subfolder')
os.makedirs(parent0)
os.makedirs(parent1)
path = osp.join(parent0.lower(), 'File.txt')
try:
candidates = cased_path_candidates(path)
assert len(candidates) == 2
assert osp.join(parent0, 'File.txt') in candidates
assert osp.join(parent1, 'File.txt') in candidates
candidates = cased_path_candidates('/test folder/subfolder/File.txt',
root=home)
assert len(candidates) == 2
assert osp.join(parent0, 'File.txt') in candidates
assert osp.join(parent1, 'File.txt') in candidates
finally:
delete(parent0)
delete(parent1)
def test_is_child():
@ -43,12 +90,12 @@ def test_is_child():
def test_delete():
# test deleting file
test_file = tempfile.NamedTemporaryFile()
assert os.path.isfile(test_file.name)
assert osp.isfile(test_file.name)
delete(test_file.name)
assert not os.path.exists(test_file.name)
assert not osp.exists(test_file.name)
# test deleting directory
test_dir = tempfile.TemporaryDirectory()
assert os.path.isdir(test_dir.name)
assert osp.isdir(test_dir.name)
delete(test_dir.name)
assert not os.path.exists(test_dir.name)
assert not osp.exists(test_dir.name)