support bytes and str paths from fsevents

This commit is contained in:
Sam Schott 2024-09-08 20:11:00 +02:00
parent 0027ed0c57
commit 96523ed668
3 changed files with 38 additions and 28 deletions

View File

@ -48,7 +48,7 @@ __all__ = [
]
def get_dest_path(event: FileSystemEvent) -> str:
def get_dest_path(event: FileSystemEvent) -> str | bytes:
"""
Returns the dest_path of a file system event if present (moved events only)
otherwise returns the src_path (which is also the "destination").
@ -425,7 +425,7 @@ class SyncEvent(Model):
change_time = stat.st_ctime if stat else None
size = stat.st_size if stat else 0
try:
symlink_target = os.readlink(event.src_path)
symlink_target = os.readlink(os.fsdecode(event.src_path))
except OSError:
symlink_target = None

View File

@ -1066,7 +1066,7 @@ class SyncEngine:
# ==== Content hashing =============================================================
def get_local_hash(self, local_path: str) -> str | None:
def get_local_hash(self, local_path: str | bytes) -> str | None:
"""
Computes content hash of a local file.
@ -1074,6 +1074,8 @@ class SyncEngine:
:returns: Content hash to compare with Dropbox's content hash, or 'folder' if
the path points to a directory. ``None`` if there is nothing at the path.
"""
local_path = os.fsdecode(local_path)
try:
stat = os.lstat(local_path)
except (FileNotFoundError, NotADirectoryError):
@ -1360,7 +1362,7 @@ class SyncEngine:
return dbx_path_cased
def to_dbx_path(self, local_path: str) -> str:
def to_dbx_path(self, local_path: str | bytes) -> str:
"""
Converts a local path to a path relative to the Dropbox folder. Casing of the
given ``local_path`` will be preserved.
@ -1369,15 +1371,15 @@ class SyncEngine:
:returns: Relative path with respect to Dropbox folder.
:raises ValueError: When the path lies outside the local Dropbox folder.
"""
if not is_equal_or_child(
local_path, self.dropbox_path, self.is_fs_case_sensitive
):
raise ValueError(f'"{local_path}" is not in "{self.dropbox_path}"')
path = os.fsdecode(local_path)
if not is_equal_or_child(path, self.dropbox_path, self.is_fs_case_sensitive):
raise ValueError(f'"{path}" is not in "{self.dropbox_path}"')
return "/" + removeprefix(
local_path, self.dropbox_path, self.is_fs_case_sensitive
path, self.dropbox_path, self.is_fs_case_sensitive
).lstrip("/")
def to_dbx_path_lower(self, local_path: str) -> str:
def to_dbx_path_lower(self, local_path: str | bytes) -> str:
"""
Converts a local path to a path relative to the Dropbox folder. The path will be
normalized as on Dropbox servers (lower case and some additional
@ -1414,7 +1416,7 @@ class SyncEngine:
dbx_path_cased = self.correct_case(dbx_path)
return self.to_local_path_from_cased(dbx_path_cased)
def is_excluded(self, path: str) -> bool:
def is_excluded(self, path: str | bytes) -> bool:
"""
Checks if a file is excluded from sync. Certain file names are always excluded
from syncing, following the Dropbox support article:
@ -1429,6 +1431,7 @@ class SyncEngine:
just a file name. Does not need to be normalized.
:returns: Whether the path is excluded from syncing.
"""
path = os.fsdecode(path)
dirname, basename = osp.split(path)
# Is in excluded files?
@ -1996,7 +1999,9 @@ class SyncEngine:
# from sync.
# mapping of path -> event history
events_for_path: defaultdict[str, list[FileSystemEvent]] = defaultdict(list)
events_for_path: defaultdict[str | bytes, list[FileSystemEvent]] = defaultdict(
list
)
# mapping of source deletion event -> destination creation event
moved_from_to: dict[FileSystemEvent, FileSystemEvent] = {}
@ -2119,8 +2124,8 @@ class SyncEngine:
# 0) Collect all moved and deleted events in sets.
dir_moved_paths: set[tuple[str, str]] = set()
dir_deleted_paths: set[str] = set()
dir_moved_paths: set[tuple[str | bytes, str | bytes]] = set()
dir_deleted_paths: set[str | bytes] = set()
for events in events_for_path.values():
event = events[0]
@ -2132,7 +2137,7 @@ class SyncEngine:
# 1) Combine moved events of folders and their children into one event.
if len(dir_moved_paths) > 0:
child_moved_dst_paths: set[str] = set()
child_moved_dst_paths: set[str | bytes] = set()
# For each event, check if it is a child of a moved event discard it if yes.
for events in events_for_path.values():
@ -2151,7 +2156,7 @@ class SyncEngine:
# 2) Combine deleted events of folders and their children to one event.
if len(dir_deleted_paths) > 0:
child_deleted_paths: set[str] = set()
child_deleted_paths: set[str | bytes] = set()
for events in events_for_path.values():
event = events[0]
@ -3737,7 +3742,7 @@ class SyncEngine:
self._logger.debug('Renamed "%s" to "%s"', local_path_old, event.local_path)
def rescan(self, local_path: str) -> None:
def rescan(self, local_path: str | bytes) -> None:
"""
Forces a rescan of a local path: schedules created events for every folder,
modified events for every file and deleted events for every deleted item

View File

@ -2,6 +2,8 @@
This module contains functions for common path operations.
"""
from __future__ import annotations
import errno
import fcntl
import itertools
@ -36,7 +38,9 @@ _AnyPath = Union[str, bytes, "os.PathLike[str]", "os.PathLike[bytes]"]
# ==== path relationships ==============================================================
def is_child(path: str, parent: str, case_sensitive: bool = True) -> bool:
def is_child(
path: str | bytes, parent: str | bytes, case_sensitive: bool = True
) -> bool:
"""
Checks if ``path`` semantically is inside ``parent``. Neither path needs to
refer to an actual item on the drive. This function is case-sensitive.
@ -49,6 +53,9 @@ def is_child(path: str, parent: str, case_sensitive: bool = True) -> bool:
if not case_sensitive:
path = normalize(path)
parent = normalize(parent)
else:
path = os.fsdecode(path)
parent = os.fsdecode(parent)
parent = parent.rstrip(osp.sep) + osp.sep
path = path.rstrip(osp.sep)
@ -56,7 +63,9 @@ def is_child(path: str, parent: str, case_sensitive: bool = True) -> bool:
return path.startswith(parent)
def is_equal_or_child(path: str, parent: str, case_sensitive: bool = True) -> bool:
def is_equal_or_child(
path: str | bytes, parent: str | bytes, case_sensitive: bool = True
) -> bool:
"""
Checks if ``path`` semantically is inside ``parent`` or equals ``parent``. Neither
path needs to refer to an actual item on the drive. This function is case-sensitive.
@ -67,11 +76,7 @@ def is_equal_or_child(path: str, parent: str, case_sensitive: bool = True) -> bo
:returns: ``True`` if ``path`` semantically lies inside ``parent`` or
``path == parent``.
"""
if not case_sensitive:
path = normalize(path)
parent = normalize(parent)
return is_child(path, parent) or path == parent
return is_child(path, parent, case_sensitive) or path == parent
# ==== case sensitivity and normalization ==============================================
@ -98,7 +103,7 @@ def normalize_unicode(string: str) -> str:
return unicodedata.normalize("NFC", string)
def normalize(string: str) -> str:
def normalize(path: str | bytes) -> str:
"""
Replicates the path normalization performed by Dropbox servers. This typically only
involves converting the path to lower case, with a few (undocumented) exceptions:
@ -121,7 +126,7 @@ def normalize(string: str) -> str:
:param string: Original path.
:returns: Normalized path.
"""
return normalize_case(normalize_unicode(string))
return normalize_case(normalize_unicode(os.fsdecode(path)))
def is_fs_case_sensitive(path: str) -> bool:
@ -417,7 +422,7 @@ def move(
def walk(
root: str,
root: str | bytes,
listdir: Callable[[str], Iterable["os.DirEntry[str]"]] = os.scandir,
) -> Iterator[Tuple[str, os.stat_result]]:
"""
@ -427,7 +432,7 @@ def walk(
:param listdir: Function to call to get the folder content.
:returns: Iterator over (path, stat) results.
"""
for entry in listdir(root):
for entry in listdir(os.fsdecode(root)):
try:
path = entry.path
stat = entry.stat(follow_symlinks=False)