[tests] migrate all to pytest

This commit is contained in:
Sam Schott 2020-11-19 14:00:32 +00:00
parent 087064af2c
commit 843a8fde6a
12 changed files with 1446 additions and 1550 deletions

View File

@ -5,39 +5,36 @@ import logging
import time import time
from datetime import datetime from datetime import datetime
import uuid import uuid
from typing import Optional
import pytest
from dropbox.files import WriteMode, FileMetadata from dropbox.files import WriteMode, FileMetadata
from maestral.main import Maestral from maestral.main import Maestral
from maestral.errors import NotFoundError, FileConflictError from maestral.errors import NotFoundError, FileConflictError
from maestral.client import convert_api_errors from maestral.client import convert_api_errors
from maestral.utils.housekeeping import remove_configuration from maestral.utils.housekeeping import remove_configuration
from maestral.utils.path import generate_cc_name, delete from maestral.utils.path import (
generate_cc_name,
delete,
to_existing_cased_path,
is_child,
)
from maestral.sync import DirectorySnapshot
from maestral.utils.appdirs import get_home_dir from maestral.utils.appdirs import get_home_dir
env_token = os.environ.get("DROPBOX_TOKEN", "") resources = os.path.dirname(__file__) + "/resources"
def setup_test_config( @pytest.fixture
config_name: str = "test-config", access_token: Optional[str] = env_token def m():
) -> Maestral: config_name = "test-config"
"""
Sets up a new maestral configuration and links it to a Dropbox account with the
given token. Creates a new local Dropbox folder for the config. The token must be an
"access token" which can be used to directly make Dropbox API calls and not a
"refresh token". Both short lived and long lived access token will work but short
lived tokens must not expire before the tests are complete.
:param config_name: Config name to use or create.
:param access_token: The access token to use to link the config to an account.
:returns: A linked Maestral instance.
"""
m = Maestral(config_name) m = Maestral(config_name)
m.log_level = logging.DEBUG m.log_level = logging.DEBUG
# link with given token # link with given token
access_token = os.environ.get("DROPBOX_TOKEN", "")
m.client._init_sdk_with_token(access_token=access_token) m.client._init_sdk_with_token(access_token=access_token)
# get corresponding Dropbox ID and store in keyring for other processes # get corresponding Dropbox ID and store in keyring for other processes
@ -47,33 +44,40 @@ def setup_test_config(
m.client.auth._token_access_type = "legacy" m.client.auth._token_access_type = "legacy"
m.client.auth.save_creds() m.client.auth.save_creds()
# set local Dropbox directory
home = get_home_dir() home = get_home_dir()
local_dropbox_dir = generate_cc_name( local_dropbox_dir = generate_cc_name(home + "/Dropbox", suffix="test runner")
os.path.join(home, "Dropbox"), suffix="test runner"
)
m.create_dropbox_directory(local_dropbox_dir) m.create_dropbox_directory(local_dropbox_dir)
return m # acquire test lock and perform initial sync
lock = DropboxTestLock(m)
if not lock.acquire(timeout=60 * 60):
raise TimeoutError("Could not acquire test lock")
# create / clean our temporary test folder
m.test_folder_dbx = "/sync_tests"
m.test_folder_local = m.to_local_path(m.test_folder_dbx)
def cleanup_test_config(m: Maestral, test_folder_dbx: Optional[str] = None) -> None: try:
""" m.client.remove(m.test_folder_dbx)
Shuts down syncing for the given Maestral instance, removes all local files and except NotFoundError:
folders related to that instance, including the local Dropbox folder, and removes pass
any '.mignore' files. m.client.make_dir(m.test_folder_dbx)
:param m: Maestral instance. # start syncing
:param test_folder_dbx: Optional test folder to clean up. m.start_sync()
""" wait_for_idle(m)
# return synced and running instance
yield m
# stop syncing and clean up remote folder # stop syncing and clean up remote folder
m.stop_sync() m.stop_sync()
if test_folder_dbx: try:
try: m.client.remove(m.test_folder_dbx)
m.client.remove(test_folder_dbx) except NotFoundError:
except NotFoundError: pass
pass
try: try:
m.client.remove("/.mignore") m.client.remove("/.mignore")
@ -87,6 +91,87 @@ def cleanup_test_config(m: Maestral, test_folder_dbx: Optional[str] = None) -> N
delete(m.dropbox_path) delete(m.dropbox_path)
remove_configuration(m.config_name) remove_configuration(m.config_name)
# release lock
lock.release()
# helper functions
def wait_for_idle(m: Maestral, minimum: int = 4):
"""Blocks until Maestral instance is idle for at least `minimum` sec."""
t0 = time.time()
while time.time() - t0 < minimum:
if m.sync.busy():
m.monitor._wait_for_idle()
t0 = time.time()
else:
time.sleep(0.1)
def assert_synced(m: Maestral):
"""Asserts that the `local_folder` and `remote_folder` are synced."""
remote_items = m.list_folder("/", recursive=True)
local_snapshot = DirectorySnapshot(m.dropbox_path)
# assert that all items from server are present locally
# with the same content hash
for r in remote_items:
dbx_path = r["path_display"]
local_path = to_existing_cased_path(dbx_path, root=m.dropbox_path)
remote_hash = r["content_hash"] if r["type"] == "FileMetadata" else "folder"
assert (
m.sync.get_local_hash(local_path) == remote_hash
), f'different file content for "{dbx_path}"'
# assert that all local items are present on server
for path in local_snapshot.paths:
if not m.sync.is_excluded(path) and is_child(path, m.dropbox_path):
if not m.sync.is_excluded(path):
dbx_path = m.sync.to_dbx_path(path).lower()
matching_items = list(
r for r in remote_items if r["path_lower"] == dbx_path
)
assert (
len(matching_items) == 1
), f'local item "{path}" does not exist on dbx'
# check that our index is correct
for entry in m.sync.get_index():
if is_child(entry.dbx_path_lower, "/"):
# check that there is a match on the server
matching_items = list(
r for r in remote_items if r["path_lower"] == entry.dbx_path_lower
)
assert (
len(matching_items) == 1
), f'indexed item "{entry.dbx_path_lower}" does not exist on dbx'
r = matching_items[0]
remote_rev = r["rev"] if r["type"] == "FileMetadata" else "folder"
# check if revs are equal on server and locally
assert (
entry.rev == remote_rev
), f'different revs for "{entry.dbx_path_lower}"'
# check if casing on drive is the same as in index
local_path_expected_casing = m.dropbox_path + entry.dbx_path_cased
local_path_actual_casing = to_existing_cased_path(
local_path_expected_casing
)
assert (
local_path_expected_casing == local_path_actual_casing
), "casing on drive does not match index"
# test lock
class DropboxTestLock: class DropboxTestLock:
""" """

View File

@ -1,52 +0,0 @@
import os
import unittest
import subprocess
from unittest import TestCase
import Pyro5.errors
from maestral.daemon import MaestralProxy
from .fixtures import setup_test_config, cleanup_test_config, DropboxTestLock
@unittest.skipUnless(os.environ.get("DROPBOX_TOKEN"), "Requires auth token")
class TestCLI(TestCase):
config_name = "cli-test-config"
@classmethod
def setUpClass(cls):
# link to an existing Dropbox account
cls.m = setup_test_config(cls.config_name)
cls.lock = DropboxTestLock(cls.m)
if not cls.lock.acquire(timeout=60 * 60):
raise TimeoutError("Could not acquire test lock")
@classmethod
def tearDownClass(cls):
# clean up linking and config
if hasattr(cls, "m"):
cleanup_test_config(cls.m)
if hasattr(cls, "lock"):
cls.lock.release()
def test_start_stop(self):
subprocess.run(["maestral", "start", "-c", self.config_name])
with MaestralProxy(self.config_name) as m:
self.assertTrue(m.running)
self.assertTrue(m.syncing)
subprocess.run(["maestral", "stop", "-c", self.config_name])
with self.assertRaises(Pyro5.errors.CommunicationError):
MaestralProxy(self.config_name)
if __name__ == "__main__":
unittest.main()

View File

@ -1,221 +1,207 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""
@author: Sam Schott (ss2151@cam.ac.uk)
(c) Sam Schott; This work is licensed under the MIT licence.
"""
import os import os
import os.path as osp import os.path as osp
import time
import unittest
from unittest import TestCase
from maestral.errors import NotFoundError, PathError import pytest
from maestral.errors import NotFoundError
from maestral.main import FileStatus, IDLE from maestral.main import FileStatus, IDLE
from maestral.main import logger as maestral_logger from maestral.main import logger as maestral_logger
from maestral.utils.path import delete from maestral.utils.path import delete
from .fixtures import setup_test_config, cleanup_test_config, DropboxTestLock from .fixtures import wait_for_idle, m
@unittest.skipUnless(os.environ.get("DROPBOX_TOKEN"), "Requires auth token") if not os.environ.get("DROPBOX_TOKEN"):
class TestAPI(TestCase): pytest.skip("Requires auth token", allow_module_level=True)
config_name = "api-test-config"
TEST_FOLDER_PATH = "/sync_tests" # API unit tests
resources = osp.dirname(__file__) + "/resources"
def setUp(self):
self.m = setup_test_config(self.config_name) def test_status_properties(m):
self.lock = DropboxTestLock(self.m)
if not self.lock.acquire(timeout=60 * 60):
raise TimeoutError("Could not acquire test lock")
# all our tests will be carried out within this folder assert not m.pending_link
self.test_folder_dbx = TestAPI.TEST_FOLDER_PATH assert not m.pending_dropbox_folder
self.test_folder_local = self.m.dropbox_path + self.TEST_FOLDER_PATH
# create / clean our temporary test folder assert m.status == IDLE
try: assert m.running
self.m.client.remove(self.test_folder_dbx) assert m.connected
except NotFoundError: assert m.syncing
pass assert not m.paused
self.m.client.make_dir(self.test_folder_dbx) assert not m.sync_errors
assert not m.fatal_errors
# start syncing maestral_logger.info("test message")
self.m.start_sync() assert m.status == "test message"
# wait until initial sync has completed
self.wait_for_idle()
def tearDown(self): def test_file_status(m):
cleanup_test_config(self.m, self.test_folder_dbx) # test synced folder
self.lock.release() file_status = m.get_file_status(m.test_folder_local)
assert file_status == FileStatus.Synced.value
# helper functions # test unwatched outside of dropbox
file_status = m.get_file_status("/url/local")
assert file_status == FileStatus.Unwatched.value
def wait_for_idle(self, minimum=4): # test unwatched non-existent
"""Blocks until Maestral is idle for at least `minimum` sec.""" file_status = m.get_file_status("/this is not a folder")
assert file_status == FileStatus.Unwatched.value, file_status
t0 = time.time() # test unwatched when paused
while time.time() - t0 < minimum: m.pause_sync()
if self.m.sync.busy(): wait_for_idle(m)
self.m.monitor._wait_for_idle()
t0 = time.time()
else:
time.sleep(0.1)
# API unit tests file_status = m.get_file_status(m.test_folder_local)
assert file_status == FileStatus.Unwatched.value
def test_status_properties(self): m.resume_sync()
self.assertEqual(IDLE, self.m.status) wait_for_idle(m)
self.assertTrue(self.m.running)
self.assertTrue(self.m.connected)
self.assertTrue(self.m.syncing)
self.assertFalse(self.m.paused)
self.assertFalse(self.m.sync_errors)
self.assertFalse(self.m.fatal_errors)
maestral_logger.info("test message") # test error status
self.assertEqual(self.m.status, "test message") invalid_local_folder = m.test_folder_local + "/test_folder\\"
os.mkdir(invalid_local_folder)
wait_for_idle(m)
def test_file_status(self): file_status = m.get_file_status(invalid_local_folder)
assert file_status == FileStatus.Error.value
# test synced folder
file_status = self.m.get_file_status(self.test_folder_local)
self.assertEqual(FileStatus.Synced.value, file_status)
# test unwatched outside of dropbox def test_move_dropbox_folder(m):
file_status = self.m.get_file_status("/url/local") new_dir_short = "~/New Dropbox"
self.assertEqual(FileStatus.Unwatched.value, file_status) new_dir = osp.realpath(osp.expanduser(new_dir_short))
# test unwatched non-existent m.move_dropbox_directory(new_dir_short)
file_status = self.m.get_file_status("/this is not a folder") assert osp.isdir(new_dir)
self.assertEqual(FileStatus.Unwatched.value, file_status) assert m.dropbox_path == new_dir
# test unwatched when paused wait_for_idle(m)
self.m.pause_sync()
self.wait_for_idle()
file_status = self.m.get_file_status(self.test_folder_local) # assert that sync was resumed after moving folder
self.assertEqual(FileStatus.Unwatched.value, file_status) assert m.syncing
self.m.resume_sync()
self.wait_for_idle()
# test error status def test_move_dropbox_folder_to_itself(m):
invalid_local_folder = self.test_folder_local + "/test_folder\\"
os.mkdir(invalid_local_folder)
self.wait_for_idle()
file_status = self.m.get_file_status(invalid_local_folder) m.move_dropbox_directory(m.dropbox_path)
self.assertEqual(FileStatus.Error.value, file_status)
def test_selective_sync_api(self): # assert that sync is still running
"""Test `Maestral.exclude_item` and Maestral.include_item`.""" assert m.syncing
test_path_local = self.test_folder_local + "/selective_sync_test_folder"
test_path_local_sub = test_path_local + "/subfolder"
test_path_dbx = self.test_folder_dbx + "/selective_sync_test_folder"
test_path_dbx_sub = test_path_dbx + "/subfolder"
# create a local folder test_path_local def test_move_dropbox_folder_to_existing(m):
os.mkdir(test_path_local)
os.mkdir(test_path_local_sub)
self.wait_for_idle()
# exclude test_path_dbx from sync new_dir_short = "~/New Dropbox"
self.m.exclude_item(test_path_dbx) new_dir = osp.realpath(osp.expanduser(new_dir_short))
self.wait_for_idle() os.mkdir(new_dir)
self.assertFalse(osp.exists(test_path_local)) try:
self.assertIn(test_path_dbx, self.m.excluded_items)
self.assertEqual(self.m.excluded_status(test_path_dbx), "excluded")
self.assertEqual(self.m.excluded_status(test_path_dbx_sub), "excluded")
self.assertEqual(
self.m.excluded_status(self.test_folder_dbx), "partially excluded"
)
# include test_path_dbx in sync, check that it worked with pytest.raises(FileExistsError):
self.m.include_item(test_path_dbx) m.move_dropbox_directory(new_dir)
self.wait_for_idle()
self.assertTrue(osp.exists(test_path_local))
self.assertNotIn(test_path_dbx, self.m.excluded_items)
self.assertEqual(self.m.excluded_status(self.test_folder_dbx), "included")
self.assertEqual(self.m.excluded_status(test_path_dbx_sub), "included")
# exclude test_path_dbx again for further tests
self.m.exclude_item(test_path_dbx)
self.wait_for_idle()
# test including a folder inside test_path_dbx,
# test_path_dbx should become included itself
self.m.include_item(test_path_dbx + "/subfolder")
self.assertNotIn(
test_path_dbx,
self.m.excluded_items,
'test_path_dbx still in "excluded_items" list',
)
# test that 'folder' is removed from excluded_list on deletion
self.m.client.remove(test_path_dbx)
self.wait_for_idle()
self.assertNotIn(
test_path_dbx,
self.m.excluded_items,
'deleted item is still in "excluded_items" list',
)
# test excluding a non-existent folder
with self.assertRaises(NotFoundError):
self.m.exclude_item(test_path_dbx)
# check for fatal errors
self.assertFalse(self.m.fatal_errors)
def test_move_dropbox_folder(self):
new_dir_short = "~/New Dropbox"
new_dir = osp.realpath(osp.expanduser(new_dir_short))
self.m.move_dropbox_directory(new_dir_short)
self.assertTrue(osp.isdir(new_dir))
self.assertEqual(new_dir, self.m.dropbox_path)
self.wait_for_idle()
# assert that sync was resumed after moving folder
self.assertTrue(self.m.syncing)
def test_move_dropbox_folder_to_itself(self):
self.m.move_dropbox_directory(self.m.dropbox_path)
# assert that sync is still running # assert that sync is still running
self.assertTrue(self.m.syncing) assert m.syncing
def test_move_dropbox_folder_to_existing(self): finally:
new_dir_short = "~/New Dropbox" # cleanup
new_dir = osp.realpath(osp.expanduser(new_dir_short)) delete(new_dir)
os.mkdir(new_dir)
try:
with self.assertRaises(FileExistsError):
self.m.move_dropbox_directory(new_dir)
# assert that sync is still running
self.assertTrue(self.m.syncing)
finally:
# cleanup
delete(new_dir)
if __name__ == "__main__": # API integration tests
unittest.main()
def test_selective_sync_api(m):
"""
Test :meth:`Maestral.exclude_item`, :meth:`MaestralMaestral.include_item`,
:meth:`Maestral.excluded_status` and :meth:`Maestral.excluded_items`.
"""
dbx_dirs = [
"/sync_tests/selective_sync_test_folder",
"/sync_tests/independent_folder",
"/sync_tests/selective_sync_test_folder/subfolder_0",
"/sync_tests/selective_sync_test_folder/subfolder_1",
]
local_dirs = [m.to_local_path(dbx_path) for dbx_path in dbx_dirs]
# create folder structure
for path in local_dirs:
os.mkdir(path)
wait_for_idle(m)
# exclude "/sync_tests/selective_sync_test_folder" from sync
m.exclude_item("/sync_tests/selective_sync_test_folder")
wait_for_idle(m)
# check that local items have been deleted
assert not osp.exists(m.to_local_path("/sync_tests/selective_sync_test_folder"))
# check that `Maestral.excluded_items` only contains top-level folder
assert "/sync_tests/selective_sync_test_folder" in m.excluded_items
assert "/sync_tests/selective_sync_test_folder/subfolder_0" not in m.excluded_items
assert "/sync_tests/selective_sync_test_folder/subfolder_1" not in m.excluded_items
# check that `Maestral.excluded_status` returns the correct values
assert m.excluded_status("/sync_tests") == "partially excluded"
assert m.excluded_status("/sync_tests/independent_folder") == "included"
for dbx_path in dbx_dirs:
if dbx_path != "/sync_tests/independent_folder":
assert m.excluded_status(dbx_path) == "excluded"
# include test_path_dbx in sync, check that it worked
m.include_item("/sync_tests/selective_sync_test_folder")
wait_for_idle(m)
assert osp.exists(m.to_local_path("/sync_tests/selective_sync_test_folder"))
assert "/sync_tests/selective_sync_test_folder" not in m.excluded_items
for dbx_path in dbx_dirs:
assert m.excluded_status(dbx_path) == "included"
# test excluding a non-existent folder
with pytest.raises(NotFoundError):
m.exclude_item("/bogus_folder")
# check for fatal errors
assert not m.fatal_errors
def test_selective_sync_api_nested(m):
"""Tests special cases of nested selected sync changes."""
dbx_dirs = [
"/sync_tests/selective_sync_test_folder",
"/sync_tests/independent_folder",
"/sync_tests/selective_sync_test_folder/subfolder_0",
"/sync_tests/selective_sync_test_folder/subfolder_1",
]
local_dirs = [m.to_local_path(dbx_path) for dbx_path in dbx_dirs]
# create folder structure
for path in local_dirs:
os.mkdir(path)
wait_for_idle(m)
# exclude "/sync_tests/selective_sync_test_folder" from sync
m.exclude_item("/sync_tests/selective_sync_test_folder")
wait_for_idle(m)
# test including a folder inside "/sync_tests/selective_sync_test_folder",
# "/sync_tests/selective_sync_test_folder" should become included itself but it
# other children will still be excluded
m.include_item("/sync_tests/selective_sync_test_folder/subfolder_0")
assert "/sync_tests/selective_sync_test_folder" not in m.excluded_items
assert "/sync_tests/selective_sync_test_folder/subfolder_1" in m.excluded_items
# check for fatal errors
assert not m.fatal_errors

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,281 @@
# -*- coding: utf-8 -*-
import timeit
import pytest
from maestral.sync import (
FileCreatedEvent,
FileDeletedEvent,
FileModifiedEvent,
FileMovedEvent,
DirCreatedEvent,
DirDeletedEvent,
DirMovedEvent,
)
from maestral.sync import SyncEngine, DropboxClient
from maestral.utils.housekeeping import remove_configuration
def ipath(i):
"""Returns path names '/test 1', '/test 2', ... """
return f"/test {i}"
@pytest.fixture
def sync():
sync = SyncEngine(DropboxClient("test-config"), None)
sync.dropbox_path = "/"
yield sync
remove_configuration("test-config")
def test_single_file_events():
# only a single event for every path -> no consolidation
file_events = [
FileModifiedEvent(ipath(1)),
FileCreatedEvent(ipath(2)),
FileDeletedEvent(ipath(3)),
FileMovedEvent(ipath(4), ipath(5)),
]
res = [
FileModifiedEvent(ipath(1)),
FileCreatedEvent(ipath(2)),
FileDeletedEvent(ipath(3)),
FileMovedEvent(ipath(4), ipath(5)),
]
cleaned_events = sync._clean_local_events(file_events)
assert set(cleaned_events) == set(res)
def test_single_path_cases():
file_events = [
# created + deleted -> None
FileCreatedEvent(ipath(1)),
FileDeletedEvent(ipath(1)),
# deleted + created -> modified
FileDeletedEvent(ipath(2)),
FileCreatedEvent(ipath(2)),
# created + modified -> created
FileCreatedEvent(ipath(3)),
FileModifiedEvent(ipath(3)),
]
res = [
# created + deleted -> None
# deleted + created -> modified
FileModifiedEvent(ipath(2)),
# created + modified -> created
FileCreatedEvent(ipath(3)),
]
cleaned_events = sync._clean_local_events(file_events)
assert set(cleaned_events) == set(res)
def test_move_events():
file_events = [
# created + moved -> created
FileCreatedEvent(ipath(1)),
FileMovedEvent(ipath(1), ipath(2)),
# moved + deleted -> deleted
FileMovedEvent(ipath(1), ipath(4)),
FileDeletedEvent(ipath(4)),
# moved + moved back -> modified
FileMovedEvent(ipath(5), ipath(6)),
FileMovedEvent(ipath(6), ipath(5)),
# moved + moved -> deleted + created
# (this is currently not handled as a single moved)
FileMovedEvent(ipath(7), ipath(8)),
FileMovedEvent(ipath(8), ipath(9)),
]
res = [
# created + moved -> created
FileCreatedEvent(ipath(2)),
# moved + deleted -> deleted
FileDeletedEvent(ipath(1)),
# moved + moved back -> modified
FileModifiedEvent(ipath(5)),
# moved + moved -> deleted + created
# (this is currently not handled as a single moved)
FileDeletedEvent(ipath(7)),
FileCreatedEvent(ipath(9)),
]
cleaned_events = sync._clean_local_events(file_events)
assert set(cleaned_events) == set(res)
def test_gedit_save():
file_events = [
FileCreatedEvent(".gedit-save-UR4EC0"), # save new version to tmp file
FileModifiedEvent(".gedit-save-UR4EC0"), # modify tmp file
FileMovedEvent(ipath(1), ipath(1) + "~"), # move old version to backup
FileMovedEvent(".gedit-save-UR4EC0", ipath(1)), # replace old version with tmp
]
res = [
FileModifiedEvent(ipath(1)), # modified file
FileCreatedEvent(ipath(1) + "~"), # backup
]
cleaned_events = sync._clean_local_events(file_events)
assert set(cleaned_events) == set(res)
def test_macos_safe_save():
file_events = [
FileMovedEvent(ipath(1), ipath(1) + ".sb-b78ef837-dLht38"), # move to backup
FileCreatedEvent(ipath(1)), # create new version
FileDeletedEvent(ipath(1) + ".sb-b78ef837-dLht38"), # delete backup
]
res = [
FileModifiedEvent(ipath(1)), # modified file
]
cleaned_events = sync._clean_local_events(file_events)
assert set(cleaned_events) == set(res)
def test_msoffice_created():
file_events = [
FileCreatedEvent(ipath(1)),
FileDeletedEvent(ipath(1)),
FileCreatedEvent(ipath(1)),
FileCreatedEvent("~$" + ipath(1)),
]
res = [
FileCreatedEvent(ipath(1)), # created file
FileCreatedEvent("~$" + ipath(1)), # backup
]
cleaned_events = sync._clean_local_events(file_events)
assert set(cleaned_events) == set(res)
def test_type_changes():
file_events = [
# keep as is
FileDeletedEvent(ipath(1)),
DirCreatedEvent(ipath(1)),
# keep as is
DirDeletedEvent(ipath(2)),
FileCreatedEvent(ipath(2)),
]
res = [
# keep as is
FileDeletedEvent(ipath(1)),
DirCreatedEvent(ipath(1)),
# keep as is
DirDeletedEvent(ipath(2)),
FileCreatedEvent(ipath(2)),
]
cleaned_events = sync._clean_local_events(file_events)
assert set(cleaned_events) == set(res)
def test_type_changes_difficult():
file_events = [
# convert to FileDeleted -> DirCreated
FileModifiedEvent(ipath(1)),
FileDeletedEvent(ipath(1)),
FileCreatedEvent(ipath(1)),
FileDeletedEvent(ipath(1)),
DirCreatedEvent(ipath(1)),
# convert to FileDeleted(path1) -> DirCreated(path2)
FileModifiedEvent(ipath(2)),
FileDeletedEvent(ipath(2)),
FileCreatedEvent(ipath(2)),
FileDeletedEvent(ipath(2)),
DirCreatedEvent(ipath(2)),
DirMovedEvent(ipath(2), ipath(3)),
]
res = [
FileDeletedEvent(ipath(1)),
DirCreatedEvent(ipath(1)),
FileDeletedEvent(ipath(2)),
DirCreatedEvent(ipath(3)),
]
cleaned_events = sync._clean_local_events(file_events)
assert set(cleaned_events) == set(res)
def test_nested_events():
file_events = [
# convert to a single DirDeleted
DirDeletedEvent(ipath(1)),
FileDeletedEvent(ipath(1) + "/file1.txt"),
FileDeletedEvent(ipath(1) + "/file2.txt"),
DirDeletedEvent(ipath(1) + "/sub"),
FileDeletedEvent(ipath(1) + "/sub/file3.txt"),
# convert to a single DirMoved
DirMovedEvent(ipath(2), ipath(3)),
FileMovedEvent(ipath(2) + "/file1.txt", ipath(3) + "/file1.txt"),
FileMovedEvent(ipath(2) + "/file2.txt", ipath(3) + "/file2.txt"),
DirMovedEvent(ipath(2) + "/sub", ipath(3) + "/sub"),
FileMovedEvent(ipath(2) + "/sub/file3.txt", ipath(3) + "/sub/file3.txt"),
]
res = [
DirDeletedEvent(ipath(1)),
DirMovedEvent(ipath(2), ipath(3)),
]
cleaned_events = sync._clean_local_events(file_events)
assert set(cleaned_events) == set(res)
def test_performance():
# 10,000 nested deleted events (5,000 folders, 5,000 files)
file_events = [DirDeletedEvent(n * ipath(1)) for n in range(1, 5001)]
file_events += [FileDeletedEvent(n * ipath(1) + ".txt") for n in range(1, 5001)]
# 10,000 nested moved events (5,000 folders, 5,000 files)
file_events += [DirMovedEvent(n * ipath(2), n * ipath(3)) for n in range(1, 5001)]
file_events += [
FileMovedEvent(n * ipath(2) + ".txt", n * ipath(3) + ".txt")
for n in range(1, 5001)
]
# 4,995 unrelated created events
file_events += [FileCreatedEvent(ipath(n)) for n in range(5, 5001)]
res = [
DirDeletedEvent(ipath(1)),
DirMovedEvent(ipath(2), ipath(3)),
FileDeletedEvent(ipath(1) + ".txt"),
FileMovedEvent(ipath(2) + ".txt", ipath(3) + ".txt"),
]
res += [FileCreatedEvent(ipath(n)) for n in range(5, 5001)]
cleaned_events = sync._clean_local_events(file_events)
assert set(cleaned_events) == set(res)
n_loops = 4
duration = timeit.timeit(
lambda: sync._clean_local_events(file_events), number=n_loops
)
assert duration < 10 * n_loops

View File

@ -1,3 +1,5 @@
# -*- coding: utf-8 -*-
import sys import sys
import os import os
import time import time
@ -160,6 +162,7 @@ def test_locking_multiprocess():
# daemon lifecycle tests # daemon lifecycle tests
@pytest.mark.skipif(os.environ.get("CI") == "true", reason="Test is flaky on Github")
def test_lifecycle_detached(config_name): def test_lifecycle_detached(config_name):
# start daemon process # start daemon process
@ -183,6 +186,7 @@ def test_lifecycle_detached(config_name):
assert res is Stop.NotRunning assert res is Stop.NotRunning
@pytest.mark.skipif(os.environ.get("CI") == "true", reason="Test is flaky on Github")
def test_lifecycle_attached(config_name): def test_lifecycle_attached(config_name):
# start daemon process # start daemon process
@ -206,6 +210,7 @@ def test_lifecycle_attached(config_name):
# proxy tests # proxy tests
@pytest.mark.skipif(os.environ.get("CI") == "true", reason="Test is flaky on Github")
def test_connection(config_name): def test_connection(config_name):
# start daemon process # start daemon process
@ -223,6 +228,7 @@ def test_connection(config_name):
assert res is Stop.Ok assert res is Stop.Ok
@pytest.mark.skipif(os.environ.get("CI") == "true", reason="Test is flaky on Github")
def test_fallback(config_name): def test_fallback(config_name):
# create proxy w/o fallback # create proxy w/o fallback
@ -236,6 +242,7 @@ def test_fallback(config_name):
assert isinstance(m._m, Maestral) assert isinstance(m._m, Maestral)
@pytest.mark.skipif(os.environ.get("CI") == "true", reason="Test is flaky on Github")
def test_remote_exceptions(config_name): def test_remote_exceptions(config_name):
# start daemon process # start daemon process

View File

@ -0,0 +1,117 @@
# -*- coding: utf-8 -*-
import os
import os.path as osp
from pathlib import Path
from threading import Event
import pytest
from maestral.sync import DirCreatedEvent, DirMovedEvent
from maestral.sync import delete, move
from maestral.sync import SyncEngine, DropboxClient, Observer, FSEventHandler
from maestral.sync import SyncDirection, ItemType, ChangeType
from maestral.utils.appdirs import get_home_dir
from maestral.utils.housekeeping import remove_configuration
def ipath(i):
"""Returns path names '/test 1', '/test 2', ... """
return f"/test {i}"
@pytest.fixture
def sync():
syncing = Event()
startup = Event()
syncing.set()
local_dir = osp.join(get_home_dir(), "dummy_dir")
os.mkdir(local_dir)
sync = SyncEngine(DropboxClient("test-config"), FSEventHandler(syncing, startup))
sync.dropbox_path = local_dir
observer = Observer()
observer.schedule(sync.fs_events, sync.dropbox_path, recursive=True)
observer.start()
yield sync
observer.stop()
observer.join()
remove_configuration("test-config")
delete(sync.dropbox_path)
def test_receiving_events():
new_dir = Path(sync.dropbox_path, "parent")
new_dir.mkdir()
sync_events, local_cursor = sync.wait_for_local_changes()
assert len(sync_events) == 1
try:
ctime = os.stat(new_dir).st_birthtime
except AttributeError:
ctime = None
event = sync_events[0]
assert event.direction == SyncDirection.Up
assert event.item_type == ItemType.Folder
assert event.change_type == ChangeType.Added
assert event.change_time == ctime
assert event.local_path == str(new_dir)
def test_ignore_tree_creation():
new_dir = Path(sync.dropbox_path, "parent")
with sync.fs_events.ignore(DirCreatedEvent(str(new_dir))):
new_dir.mkdir()
for i in range(10):
file = new_dir / f"test_{i}"
file.touch()
sync_events, local_cursor = sync.wait_for_local_changes()
assert len(sync_events) == 0
def test_ignore_tree_move():
new_dir = Path(sync.dropbox_path, "parent")
new_dir.mkdir()
for i in range(10):
file = new_dir / f"test_{i}"
file.touch()
sync.wait_for_local_changes()
new_dir_1 = Path(sync.dropbox_path, "parent2")
with sync.fs_events.ignore(DirMovedEvent(str(new_dir), str(new_dir_1))):
move(new_dir, new_dir_1)
sync_events, local_cursor = sync.wait_for_local_changes()
assert len(sync_events) == 0
def test_catching_non_ignored_events():
new_dir = Path(sync.dropbox_path, "parent")
with sync.fs_events.ignore(DirCreatedEvent(str(new_dir)), recursive=False):
new_dir.mkdir()
for i in range(10):
# may trigger FileCreatedEvent and FileModifiedVent
file = new_dir / f"test_{i}"
file.touch()
sync_events, local_cursor = sync.wait_for_local_changes()
assert all(not si.is_directory for si in sync_events)

View File

@ -1,386 +0,0 @@
# -*- coding: utf-8 -*-
"""
@author: Sam Schott (ss2151@cam.ac.uk)
(c) Sam Schott; This work is licensed under the MIT licence.
"""
import os
import os.path as osp
from pathlib import Path
from threading import Event
import timeit
from unittest import TestCase
from maestral.sync import (
FileCreatedEvent,
FileDeletedEvent,
FileModifiedEvent,
FileMovedEvent,
DirCreatedEvent,
DirDeletedEvent,
DirMovedEvent,
)
from maestral.sync import delete, move
from maestral.sync import SyncEngine, DropboxClient, Observer, FSEventHandler
from maestral.sync import SyncDirection, ItemType, ChangeType
from maestral.utils.appdirs import get_home_dir
from maestral.utils.housekeeping import remove_configuration
def ipath(i):
"""Returns path names '/test 1', '/test 2', ... """
return f"/test {i}"
class TestCleanLocalEvents(TestCase):
def setUp(self):
# noinspection PyTypeChecker
self.sync = SyncEngine(DropboxClient("test-config"), None)
self.sync.dropbox_path = "/"
def tearDown(self):
remove_configuration("test-config")
def test_single_file_events(self):
# only a single event for every path -> no consolidation
file_events = [
FileModifiedEvent(ipath(1)),
FileCreatedEvent(ipath(2)),
FileDeletedEvent(ipath(3)),
FileMovedEvent(ipath(4), ipath(5)),
]
res = [
FileModifiedEvent(ipath(1)),
FileCreatedEvent(ipath(2)),
FileDeletedEvent(ipath(3)),
FileMovedEvent(ipath(4), ipath(5)),
]
cleaned_events = self.sync._clean_local_events(file_events)
self.assertEqual(set(cleaned_events), set(res))
def test_single_path_cases(self):
file_events = [
# created + deleted -> None
FileCreatedEvent(ipath(1)),
FileDeletedEvent(ipath(1)),
# deleted + created -> modified
FileDeletedEvent(ipath(2)),
FileCreatedEvent(ipath(2)),
# created + modified -> created
FileCreatedEvent(ipath(3)),
FileModifiedEvent(ipath(3)),
]
res = [
# created + deleted -> None
# deleted + created -> modified
FileModifiedEvent(ipath(2)),
# created + modified -> created
FileCreatedEvent(ipath(3)),
]
cleaned_events = self.sync._clean_local_events(file_events)
self.assertEqual(set(cleaned_events), set(res))
def test_move_events(self):
file_events = [
# created + moved -> created
FileCreatedEvent(ipath(1)),
FileMovedEvent(ipath(1), ipath(2)),
# moved + deleted -> deleted
FileMovedEvent(ipath(1), ipath(4)),
FileDeletedEvent(ipath(4)),
# moved + moved back -> modified
FileMovedEvent(ipath(5), ipath(6)),
FileMovedEvent(ipath(6), ipath(5)),
# moved + moved -> deleted + created
# (this is currently not handled as a single moved)
FileMovedEvent(ipath(7), ipath(8)),
FileMovedEvent(ipath(8), ipath(9)),
]
res = [
# created + moved -> created
FileCreatedEvent(ipath(2)),
# moved + deleted -> deleted
FileDeletedEvent(ipath(1)),
# moved + moved back -> modified
FileModifiedEvent(ipath(5)),
# moved + moved -> deleted + created
# (this is currently not handled as a single moved)
FileDeletedEvent(ipath(7)),
FileCreatedEvent(ipath(9)),
]
cleaned_events = self.sync._clean_local_events(file_events)
self.assertEqual(set(cleaned_events), set(res))
def test_gedit_save(self):
file_events = [
FileCreatedEvent(".gedit-save-UR4EC0"), # save new version to tmp file
FileModifiedEvent(".gedit-save-UR4EC0"), # modify tmp file
FileMovedEvent(ipath(1), ipath(1) + "~"), # move old version to backup
FileMovedEvent(
".gedit-save-UR4EC0", ipath(1)
), # replace old version with tmp
]
res = [
FileModifiedEvent(ipath(1)), # modified file
FileCreatedEvent(ipath(1) + "~"), # backup
]
cleaned_events = self.sync._clean_local_events(file_events)
self.assertEqual(set(cleaned_events), set(res))
def test_macos_safe_save(self):
file_events = [
FileMovedEvent(
ipath(1), ipath(1) + ".sb-b78ef837-dLht38"
), # move to backup
FileCreatedEvent(ipath(1)), # create new version
FileDeletedEvent(ipath(1) + ".sb-b78ef837-dLht38"), # delete backup
]
res = [
FileModifiedEvent(ipath(1)), # modified file
]
cleaned_events = self.sync._clean_local_events(file_events)
self.assertEqual(set(cleaned_events), set(res))
def test_msoffice_created(self):
file_events = [
FileCreatedEvent(ipath(1)),
FileDeletedEvent(ipath(1)),
FileCreatedEvent(ipath(1)),
FileCreatedEvent("~$" + ipath(1)),
]
res = [
FileCreatedEvent(ipath(1)), # created file
FileCreatedEvent("~$" + ipath(1)), # backup
]
cleaned_events = self.sync._clean_local_events(file_events)
self.assertEqual(set(cleaned_events), set(res))
def test_type_changes(self):
file_events = [
# keep as is
FileDeletedEvent(ipath(1)),
DirCreatedEvent(ipath(1)),
# keep as is
DirDeletedEvent(ipath(2)),
FileCreatedEvent(ipath(2)),
]
res = [
# keep as is
FileDeletedEvent(ipath(1)),
DirCreatedEvent(ipath(1)),
# keep as is
DirDeletedEvent(ipath(2)),
FileCreatedEvent(ipath(2)),
]
cleaned_events = self.sync._clean_local_events(file_events)
self.assertEqual(set(cleaned_events), set(res))
def test_type_changes_difficult(self):
file_events = [
# convert to FileDeleted -> DirCreated
FileModifiedEvent(ipath(1)),
FileDeletedEvent(ipath(1)),
FileCreatedEvent(ipath(1)),
FileDeletedEvent(ipath(1)),
DirCreatedEvent(ipath(1)),
# convert to FileDeleted(path1) -> DirCreated(path2)
FileModifiedEvent(ipath(2)),
FileDeletedEvent(ipath(2)),
FileCreatedEvent(ipath(2)),
FileDeletedEvent(ipath(2)),
DirCreatedEvent(ipath(2)),
DirMovedEvent(ipath(2), ipath(3)),
]
res = [
FileDeletedEvent(ipath(1)),
DirCreatedEvent(ipath(1)),
FileDeletedEvent(ipath(2)),
DirCreatedEvent(ipath(3)),
]
cleaned_events = self.sync._clean_local_events(file_events)
self.assertEqual(set(cleaned_events), set(res))
def test_nested_events(self):
file_events = [
# convert to a single DirDeleted
DirDeletedEvent(ipath(1)),
FileDeletedEvent(ipath(1) + "/file1.txt"),
FileDeletedEvent(ipath(1) + "/file2.txt"),
DirDeletedEvent(ipath(1) + "/sub"),
FileDeletedEvent(ipath(1) + "/sub/file3.txt"),
# convert to a single DirMoved
DirMovedEvent(ipath(2), ipath(3)),
FileMovedEvent(ipath(2) + "/file1.txt", ipath(3) + "/file1.txt"),
FileMovedEvent(ipath(2) + "/file2.txt", ipath(3) + "/file2.txt"),
DirMovedEvent(ipath(2) + "/sub", ipath(3) + "/sub"),
FileMovedEvent(ipath(2) + "/sub/file3.txt", ipath(3) + "/sub/file3.txt"),
]
res = [
DirDeletedEvent(ipath(1)),
DirMovedEvent(ipath(2), ipath(3)),
]
cleaned_events = self.sync._clean_local_events(file_events)
self.assertEqual(set(cleaned_events), set(res))
def test_performance(self):
# 10,000 nested deleted events (5,000 folders, 5,000 files)
file_events = [DirDeletedEvent(n * ipath(1)) for n in range(1, 5001)]
file_events += [FileDeletedEvent(n * ipath(1) + ".txt") for n in range(1, 5001)]
# 10,000 nested moved events (5,000 folders, 5,000 files)
file_events += [
DirMovedEvent(n * ipath(2), n * ipath(3)) for n in range(1, 5001)
]
file_events += [
FileMovedEvent(n * ipath(2) + ".txt", n * ipath(3) + ".txt")
for n in range(1, 5001)
]
# 4,995 unrelated created events
file_events += [FileCreatedEvent(ipath(n)) for n in range(5, 5001)]
res = [
DirDeletedEvent(ipath(1)),
DirMovedEvent(ipath(2), ipath(3)),
FileDeletedEvent(ipath(1) + ".txt"),
FileMovedEvent(ipath(2) + ".txt", ipath(3) + ".txt"),
]
res += [FileCreatedEvent(ipath(n)) for n in range(5, 5001)]
cleaned_events = self.sync._clean_local_events(file_events)
self.assertEqual(set(cleaned_events), set(res))
n_loops = 4
duration = timeit.timeit(
lambda: self.sync._clean_local_events(file_events), number=n_loops
)
self.assertLess(duration, 10 * n_loops)
class TestIgnoreLocalEvents(TestCase):
def setUp(self):
syncing = Event()
startup = Event()
syncing.set()
local_dir = osp.join(get_home_dir(), "dummy_dir")
os.mkdir(local_dir)
self.sync = SyncEngine(
DropboxClient("test-config"), FSEventHandler(syncing, startup)
)
self.sync.dropbox_path = local_dir
self.observer = Observer()
self.observer.schedule(
self.sync.fs_events, self.sync.dropbox_path, recursive=True
)
self.observer.start()
def tearDown(self):
self.observer.stop()
self.observer.join()
remove_configuration("test-config")
delete(self.sync.dropbox_path)
def test_receiving_events(self):
new_dir = Path(self.sync.dropbox_path, "parent")
new_dir.mkdir()
sync_events, local_cursor = self.sync.wait_for_local_changes()
self.assertEqual(len(sync_events), 1)
try:
ctime = os.stat(new_dir).st_birthtime
except AttributeError:
ctime = None
event = sync_events[0]
self.assertEqual(event.direction, SyncDirection.Up)
self.assertEqual(event.item_type, ItemType.Folder)
self.assertEqual(event.change_type, ChangeType.Added)
self.assertEqual(event.change_time, ctime)
self.assertEqual(event.local_path, str(new_dir))
def test_ignore_tree_creation(self):
new_dir = Path(self.sync.dropbox_path, "parent")
with self.sync.fs_events.ignore(DirCreatedEvent(str(new_dir))):
new_dir.mkdir()
for i in range(10):
file = new_dir / f"test_{i}"
file.touch()
sync_events, local_cursor = self.sync.wait_for_local_changes()
self.assertEqual(len(sync_events), 0)
def test_ignore_tree_move(self):
new_dir = Path(self.sync.dropbox_path, "parent")
new_dir.mkdir()
for i in range(10):
file = new_dir / f"test_{i}"
file.touch()
self.sync.wait_for_local_changes()
new_dir_1 = Path(self.sync.dropbox_path, "parent2")
with self.sync.fs_events.ignore(DirMovedEvent(str(new_dir), str(new_dir_1))):
move(new_dir, new_dir_1)
sync_events, local_cursor = self.sync.wait_for_local_changes()
self.assertEqual(len(sync_events), 0)
def test_catching_non_ignored_events(self):
new_dir = Path(self.sync.dropbox_path, "parent")
with self.sync.fs_events.ignore(DirCreatedEvent(str(new_dir)), recursive=False):
new_dir.mkdir()
for i in range(10):
# may trigger FileCreatedEvent and FileModifiedVent
file = new_dir / f"test_{i}"
file.touch()
sync_events, local_cursor = self.sync.wait_for_local_changes()
self.assertTrue(all(not si.is_directory for si in sync_events))

View File

@ -1,10 +1,5 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""
@author: Sam Schott (ss2151@cam.ac.uk)
(c) Sam Schott; This work is licensed under the MIT licence.
"""
import platform import platform
from maestral.utils.appdirs import ( from maestral.utils.appdirs import (

View File

@ -1,10 +1,5 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""
@author: Sam Schott (ss2151@cam.ac.uk)
(c) Sam Schott; This work is licensed under the MIT licence.
"""
import os.path as osp import os.path as osp
import tempfile import tempfile

View File

@ -1,10 +1,5 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""
@author: Sam Schott (ss2151@cam.ac.uk)
(c) Sam Schott; This work is licensed under the MIT licence.
"""
import builtins import builtins
import pytest import pytest

View File

@ -1,10 +1,5 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""
@author: Sam Schott (ss2151@cam.ac.uk)
(c) Sam Schott; This work is licensed under the MIT licence.
"""
import pytest import pytest
from maestral.utils import get_newer_version from maestral.utils import get_newer_version