add type annotations to more of the integration tests

Summary:
We already had type annotations on most of the `hg` integration tests.  This
adds them for the top-level (non-source-control-specific) tests.

typeseverywhere

Reviewed By: wez

Differential Revision: D7459281

fbshipit-source-id: 41266b232ded510d6b63dd3e62c272a0cd6a0e1a
This commit is contained in:
Adam Simpkins 2018-04-04 17:31:28 -07:00 committed by Facebook Github Bot
parent 13b1502424
commit 398a824aac
26 changed files with 274 additions and 210 deletions

View File

@ -23,7 +23,7 @@ class BasicTest(testcase.EdenRepoTest):
about the sample git repo and that it is correct are all
things that are appropriate to include in this test case.
'''
def populate_repo(self):
def populate_repo(self) -> None:
self.repo.write_file('hello', 'hola\n')
self.repo.write_file('adir/file', 'foo!\n')
self.repo.write_file('bdir/test.sh', '#!/bin/bash\necho test\n',
@ -36,7 +36,7 @@ class BasicTest(testcase.EdenRepoTest):
'.eden', 'adir', 'bdir', 'hello', 'slink'
])
def test_version(self):
def test_version(self) -> None:
output = self.eden.run_cmd('version', cwd=self.mount)
lines = output.splitlines()
@ -57,7 +57,7 @@ class BasicTest(testcase.EdenRepoTest):
running_info = lines[1]
self.assertTrue(running_info.startswith('Running: '))
def test_fileList(self):
def test_fileList(self) -> None:
entries = set(os.listdir(self.mount))
self.assertEqual(self.expected_mount_entries, entries)
@ -75,16 +75,16 @@ class BasicTest(testcase.EdenRepoTest):
st = os.lstat(slink)
self.assertTrue(stat.S_ISLNK(st.st_mode))
def test_symlinks(self):
def test_symlinks(self) -> None:
slink = os.path.join(self.mount, 'slink')
self.assertEqual(os.readlink(slink), 'hello')
def test_regular(self):
def test_regular(self) -> None:
hello = os.path.join(self.mount, 'hello')
with open(hello, 'r') as f:
self.assertEqual('hola\n', f.read())
def test_dir(self):
def test_dir(self) -> None:
entries = sorted(os.listdir(os.path.join(self.mount, 'adir')))
self.assertEqual(['file'], entries)
@ -92,7 +92,7 @@ class BasicTest(testcase.EdenRepoTest):
with open(filename, 'r') as f:
self.assertEqual('foo!\n', f.read())
def test_create(self):
def test_create(self) -> None:
filename = os.path.join(self.mount, 'notinrepo')
with open(filename, 'w') as f:
f.write('created\n')
@ -107,7 +107,7 @@ class BasicTest(testcase.EdenRepoTest):
self.assertEqual(st.st_size, 8)
self.assertTrue(stat.S_ISREG(st.st_mode))
def test_overwrite(self):
def test_overwrite(self) -> None:
hello = os.path.join(self.mount, 'hello')
with open(hello, 'w') as f:
f.write('replaced\n')
@ -115,7 +115,7 @@ class BasicTest(testcase.EdenRepoTest):
st = os.lstat(hello)
self.assertEqual(st.st_size, len('replaced\n'))
def test_append(self):
def test_append(self) -> None:
hello = os.path.join(self.mount, 'bdir/test.sh')
with open(hello, 'a') as f:
f.write('echo more commands\n')
@ -127,7 +127,7 @@ class BasicTest(testcase.EdenRepoTest):
self.assertEqual(expected_data, read_back)
self.assertEqual(len(expected_data), st.st_size)
def test_materialize(self):
def test_materialize(self) -> None:
hello = os.path.join(self.mount, 'hello')
# Opening for write should materialize the file with the same
# contents that we expect
@ -137,7 +137,7 @@ class BasicTest(testcase.EdenRepoTest):
st = os.lstat(hello)
self.assertEqual(st.st_size, len('hola\n'))
def test_mkdir(self):
def test_mkdir(self) -> None:
# Can't create a directory inside a file that is in the store
with self.assertRaises(OSError) as context:
os.mkdir(os.path.join(self.mount, 'hello', 'world'))
@ -174,8 +174,8 @@ class BasicTest(testcase.EdenRepoTest):
st = os.lstat(deep_file)
self.assertTrue(stat.S_ISREG(st.st_mode))
def test_access(self):
def check_access(path, mode):
def test_access(self) -> None:
def check_access(path: str, mode: int) -> bool:
return os.access(os.path.join(self.mount, path), mode)
self.assertTrue(check_access('hello', os.R_OK))
@ -201,7 +201,7 @@ class BasicTest(testcase.EdenRepoTest):
msg='attempting to run noexec.sh should fail with '
'EACCES')
def test_unmount(self):
def test_unmount(self) -> None:
entries = set(os.listdir(self.mount))
self.assertEqual(self.expected_mount_entries, entries)
@ -219,7 +219,7 @@ class BasicTest(testcase.EdenRepoTest):
self.assertTrue(self.eden.in_proc_mounts(self.mount))
def test_unmount_remount(self):
def test_unmount_remount(self) -> None:
# write a file into the overlay to test that it is still visible
# when we remount.
filename = os.path.join(self.mount, 'overlayonly')
@ -247,7 +247,7 @@ class BasicTest(testcase.EdenRepoTest):
with open(filename, 'r') as f:
self.assertEqual('foo!\n', f.read(), msg='overlay file is correct')
def test_double_unmount(self):
def test_double_unmount(self) -> None:
# Test calling "unmount" twice. The second should fail, but edenfs
# should still work normally afterwards
self.eden.run_cmd('unmount', self.mount)
@ -260,7 +260,7 @@ class BasicTest(testcase.EdenRepoTest):
entries = sorted(os.listdir(self.mount))
self.assertEqual(['.eden', 'adir', 'bdir', 'hello', 'slink'], entries)
def test_statvfs(self):
def test_statvfs(self) -> None:
hello_path = os.path.join(self.mount, 'hello')
fs_info = os.statvfs(hello_path)
self.assertGreaterEqual(fs_info.f_namemax, 255)

View File

@ -26,11 +26,11 @@ repo_name = 'main'
@testcase.eden_repo_test
class CloneTest(testcase.EdenRepoTest):
def populate_repo(self):
def populate_repo(self) -> None:
self.repo.write_file('hello', 'hola\n')
self.repo.commit('Initial commit.')
def test_clone_to_non_existent_directory(self):
def test_clone_to_non_existent_directory(self) -> None:
tmp = self._new_tmp_dir()
non_existent_dir = os.path.join(tmp, 'foo/bar/baz')
@ -38,7 +38,7 @@ class CloneTest(testcase.EdenRepoTest):
self.assertTrue(os.path.isfile(os.path.join(non_existent_dir, 'hello')),
msg='clone should succeed in non-existent directory')
def test_clone_to_existing_empty_directory(self):
def test_clone_to_existing_empty_directory(self) -> None:
tmp = self._new_tmp_dir()
empty_dir = os.path.join(tmp, 'foo/bar/baz')
os.makedirs(empty_dir)
@ -47,7 +47,7 @@ class CloneTest(testcase.EdenRepoTest):
self.assertTrue(os.path.isfile(os.path.join(empty_dir, 'hello')),
msg='clone should succeed in empty directory')
def test_clone_from_repo(self):
def test_clone_from_repo(self) -> None:
# Specify the source of the clone as an existing local repo rather than
# an alias for a config.
destination_dir = self._new_tmp_dir()
@ -55,7 +55,7 @@ class CloneTest(testcase.EdenRepoTest):
self.assertTrue(os.path.isfile(os.path.join(destination_dir, 'hello')),
msg='clone should succeed in empty directory')
def test_clone_from_eden_repo(self):
def test_clone_from_eden_repo(self) -> None:
# Add a config alias for a repo with some bind mounts.
edenrc = os.path.join(os.environ['HOME'], '.edenrc')
with open(edenrc, 'w') as f:
@ -87,7 +87,7 @@ class CloneTest(testcase.EdenRepoTest):
msg='clone should inherit its config from eden_clone1, '
'which should include the bind mounts.')
def test_clone_with_valid_revision_cmd_line_arg_works(self):
def test_clone_with_valid_revision_cmd_line_arg_works(self) -> None:
tmp = self._new_tmp_dir()
target = os.path.join(tmp, 'foo/bar/baz')
self.eden.run_cmd('clone', '--snapshot', self.repo.get_head_hash(),
@ -95,7 +95,7 @@ class CloneTest(testcase.EdenRepoTest):
self.assertTrue(os.path.isfile(os.path.join(target, 'hello')),
msg='clone should succeed with --snapshop arg.')
def test_clone_with_short_revision_cmd_line_arg_works(self):
def test_clone_with_short_revision_cmd_line_arg_works(self) -> None:
tmp = self._new_tmp_dir()
target = os.path.join(tmp, 'foo/bar/baz')
short = self.repo.get_head_hash()[:6]
@ -103,7 +103,7 @@ class CloneTest(testcase.EdenRepoTest):
self.assertTrue(os.path.isfile(os.path.join(target, 'hello')),
msg='clone should succeed with short --snapshop arg.')
def test_clone_to_non_empty_directory_fails(self):
def test_clone_to_non_empty_directory_fails(self) -> None:
tmp = self._new_tmp_dir()
non_empty_dir = os.path.join(tmp, 'foo/bar/baz')
os.makedirs(non_empty_dir)
@ -116,7 +116,7 @@ class CloneTest(testcase.EdenRepoTest):
self.assertIn(os.strerror(errno.ENOTEMPTY), stderr,
msg='clone into non-empty dir should raise ENOTEMPTY')
def test_clone_with_invalid_revision_cmd_line_arg_fails(self):
def test_clone_with_invalid_revision_cmd_line_arg_fails(self) -> None:
tmp = self._new_tmp_dir()
empty_dir = os.path.join(tmp, 'foo/bar/baz')
os.makedirs(empty_dir)
@ -127,7 +127,7 @@ class CloneTest(testcase.EdenRepoTest):
self.assertIn('Obtained commit for repo is invalid', stderr,
msg='passing invalid commit on cmd line should fail')
def test_clone_to_file_fails(self):
def test_clone_to_file_fails(self) -> None:
tmp = self._new_tmp_dir()
non_empty_dir = os.path.join(tmp, 'foo/bar/baz')
os.makedirs(non_empty_dir)
@ -141,7 +141,9 @@ class CloneTest(testcase.EdenRepoTest):
self.assertIn(os.strerror(errno.ENOTDIR), stderr,
msg='clone into file should raise ENOTDIR')
def test_clone_to_non_existent_directory_that_is_under_a_file_fails(self):
def test_clone_to_non_existent_directory_that_is_under_a_file_fails(
self
) -> None:
tmp = self._new_tmp_dir()
non_existent_dir = os.path.join(tmp, 'foo/bar/baz')
with open(os.path.join(tmp, 'foo'), 'w') as f:
@ -154,7 +156,7 @@ class CloneTest(testcase.EdenRepoTest):
msg='When the directory cannot be created because the '
'ancestor is a parent, clone should raise ENOTDIR')
def test_post_clone_hook(self):
def test_post_clone_hook(self) -> None:
edenrc = os.path.join(os.environ['HOME'], '.edenrc')
hooks_dir = os.path.join(self.tmp_dir, 'the_hooks')
os.mkdir(hooks_dir)
@ -196,7 +198,7 @@ echo -n "$1" >> "{scratch_file}"
self.eden.start()
self.assertEqual(new_contents, util.read_all(scratch_file))
def test_attempt_clone_invalid_repo_name(self):
def test_attempt_clone_invalid_repo_name(self) -> None:
tmp = self._new_tmp_dir()
repo_name = 'repo-name-that-is-not-in-the-config'
@ -251,5 +253,5 @@ echo -n "$1" >> "{scratch_file}"
msg='Eden should have two mounts.')
self.assertEqual('hola\n', util.read_all(os.path.join(tmp, 'hello')))
def _new_tmp_dir(self):
def _new_tmp_dir(self) -> str:
return tempfile.mkdtemp(dir=self.tmp_dir)

View File

@ -15,11 +15,11 @@ import time
@testcase.eden_repo_test
class DebugGetPathTest(testcase.EdenRepoTest):
def populate_repo(self):
def populate_repo(self) -> None:
self.repo.write_file('hello', 'hola\n')
self.repo.commit('Initial commit.')
def test_getpath_root_inode(self):
def test_getpath_root_inode(self) -> None:
'''
Test that calling `eden debug getpath 1` returns the path to the eden
mount, and indicates that the inode is loaded.
@ -32,10 +32,10 @@ class DebugGetPathTest(testcase.EdenRepoTest):
self.assertEqual('loaded ' + self.mount + '\n', output)
def test_getpath_dot_eden_inode(self):
def test_getpath_dot_eden_inode(self) -> None:
'''
Test that calling `eden debug getpath ${ino}` returns the path to the .eden
directory, and indicates that the inode is loaded.
Test that calling `eden debug getpath ${ino}` returns the path to the
.eden directory, and indicates that the inode is loaded.
'''
st = os.lstat(os.path.join(self.mount, '.eden'))
self.assertTrue(stat.S_ISDIR(st.st_mode))
@ -51,7 +51,7 @@ class DebugGetPathTest(testcase.EdenRepoTest):
'loaded ' + os.path.join(self.mount, '.eden') + '\n',
output)
def test_getpath_invalid_inode(self):
def test_getpath_invalid_inode(self) -> None:
'''
Test that calling `eden debug getpath 1234` raises an error since
1234 is not a valid inode number
@ -65,7 +65,7 @@ class DebugGetPathTest(testcase.EdenRepoTest):
self.assertIn('unknown inode number 1234',
context.exception.stderr.decode())
def test_getpath_unloaded_inode(self):
def test_getpath_unloaded_inode(self) -> None:
'''
Test that calling `eden debug getpath` on an unloaded inode returns the
correct path and indicates that it is unloaded
@ -90,7 +90,7 @@ class DebugGetPathTest(testcase.EdenRepoTest):
f'unloaded {filepath}\n',
output)
def test_getpath_unloaded_inode_rename_parent(self):
def test_getpath_unloaded_inode_rename_parent(self) -> None:
'''
Test that when an unloaded inode has one of its parents renamed,
`eden debug getpath` returns the new path
@ -118,7 +118,7 @@ class DebugGetPathTest(testcase.EdenRepoTest):
os.path.join(self.mount, 'newname', 'bar', 'test.txt') + '\n',
output)
def unload_one_inode_under(self, path):
def unload_one_inode_under(self, path: str) -> None:
# TODO: To support unloading more than one inode, sum the return value
# until count is reached our the attempt limit has been reached.
remaining_attempts = 5
@ -135,7 +135,7 @@ class DebugGetPathTest(testcase.EdenRepoTest):
time.sleep(1)
continue
def test_getpath_unlinked_inode(self):
def test_getpath_unlinked_inode(self) -> None:
'''
Test that when an inode is unlinked, `eden debug getpath` indicates
that it is unlinked

View File

@ -11,12 +11,12 @@ from .lib import edenclient, testcase
class HealthTest(testcase.EdenTestCase):
def test_is_healthy(self):
def test_is_healthy(self) -> None:
self.assertTrue(self.eden.is_healthy())
self.eden.shutdown()
self.assertFalse(self.eden.is_healthy())
def test_disconnected_daemon_is_not_healthy(self):
def test_disconnected_daemon_is_not_healthy(self) -> None:
# Create a new edenfs instance that is never started, and make sure
# it is not healthy.
with edenclient.EdenFS() as client:

View File

@ -17,7 +17,7 @@ class HelpTest(unittest.TestCase):
It can be removed when the remaining integration tests are enabled
on sandcastle.
"""
def test_eden_cli_help_returns_without_error(self):
def test_eden_cli_help_returns_without_error(self) -> None:
with edenclient.EdenFS() as client:
return_code = client.run_unchecked('help')
self.assertEqual(0, return_code)

View File

@ -102,7 +102,6 @@ class EdenHgTestCase(testcase.EdenTestCase):
# Edit the edenrc file to set up post-clone hooks that will correctly
# populate the .hg directory inside the eden client.
self.amend_edenrc_before_clone()
self.mount = os.path.join(self.mounts_dir, 'main')
self.eden.clone(self.backing_repo_name, self.mount, allow_empty=True)
# Now create the repository object that refers to the eden client

View File

@ -11,17 +11,13 @@ from .lib import testcase
import json
import os
# This is the name of the default repository created by EdenRepoTestBase.
repo_name = 'main'
@testcase.eden_repo_test
class InfoTest(testcase.EdenRepoTest):
def populate_repo(self):
def populate_repo(self) -> None:
self.repo.write_file('hello', 'hola\n')
self.repo.commit('Initial commit.')
def test_info_with_bind_mounts(self):
def test_info_with_bind_mounts(self) -> None:
edenrc = os.path.join(os.environ['HOME'], '.edenrc')
with open(edenrc, 'w') as f:
f.write(
@ -33,7 +29,7 @@ type = {repo_type}
[bindmounts {repo_name}]
buck-out = buck-out
'''.format(
repo_name=repo_name,
repo_name=self.repo_name,
repo_path=self.repo.get_canonical_root(),
repo_type=self.repo.get_type()
)
@ -42,7 +38,7 @@ buck-out = buck-out
basename = 'eden_mount'
tmp = os.path.join(self.tmp_dir, basename)
self.eden.run_cmd('clone', repo_name, tmp)
self.eden.run_cmd('clone', self.repo_name, tmp)
info = self.eden.run_cmd('info', tmp)
client_info = json.loads(info)
@ -59,7 +55,7 @@ buck-out = buck-out
}, client_info
)
def test_relative_path(self):
def test_relative_path(self) -> None:
'''
Test calling "eden info <relative_path>" and make sure it gives
the expected results.
@ -80,7 +76,7 @@ buck-out = buck-out
}, client_info
)
def test_through_symlink(self):
def test_through_symlink(self) -> None:
'''
Test calling "eden info" through a symlink and make sure it gives
the expected results. This makes sure "eden info" resolves the path

View File

@ -10,14 +10,24 @@
import json
import os
import subprocess
import typing
from .find_executables import FSATTR
def getxattr(abspath, attr):
raw_stdout = subprocess.check_output([FSATTR, '--attrName', attr, '--fileName', abspath])
return json.loads(raw_stdout)
def getxattr(abspath: str, attr: str) -> str:
raw_stdout = subprocess.check_output(
[FSATTR, '--attrName', attr, '--fileName', abspath]
)
result = json.loads(raw_stdout)
# fsattr should always return a string here. We just cast the result,
# without actually validating it for now.
return typing.cast(str, result)
def listxattr(abspath):
def listxattr(abspath: str) -> typing.Dict[str, str]:
raw_stdout = subprocess.check_output([FSATTR, '--fileName', abspath])
return json.loads(raw_stdout)
result = json.loads(raw_stdout)
# fsattr should always return a dictionary here. We just cast the
# result, without actually validating it for now.
return typing.cast(typing.Dict[str, str], result)

View File

@ -48,6 +48,18 @@ class Repository(object):
'''Returns the 40-character hex hash for HEAD.'''
raise NotImplementedError('subclasses must implement get_head_hash()')
def commit(self,
message: str,
author_name: Optional[str]=None,
author_email: Optional[str]=None,
date: Optional[datetime.datetime]=None,
amend: bool=False) -> str:
'''
Create a commit.
Returns the new commit hash as a 40-character hexadecimal string.
'''
raise NotImplementedError('subclasses must implement commit()')
def add_file(self, path: str) -> None:
self.add_files([path])

View File

@ -23,14 +23,18 @@ from hypothesis.internal.detection import is_hypothesis_test
from hypothesis.configuration import (
set_hypothesis_home_dir,
hypothesis_home_dir)
from typing import Dict, List, Optional
from typing import (
Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple, Type
)
import eden.thrift
from . import edenclient
from . import hgrepo
from . import gitrepo
from . import repobase
def is_sandcastle():
def is_sandcastle() -> bool:
return 'SANDCASTLE' in os.environ
@ -76,7 +80,7 @@ if not edenclient.can_run_eden():
# This is avoiding a reporting noise issue in our CI that files
# tasks about skipped tests. Let's just skip defining most of them
# to avoid the noise if we know that they won't work anyway.
TestParent = typing.cast(typing.Type[unittest.TestCase], object)
TestParent = typing.cast(Type[unittest.TestCase], object)
else:
TestParent = unittest.TestCase
@ -90,7 +94,10 @@ class EdenTestCase(TestParent):
tearDown().
'''
def run(self, report=None):
def run(
self,
report: Optional[unittest.result.TestResult] = None
) -> unittest.result.TestResult:
''' Some slightly awful magic here to arrange for setUp and
tearDown to be called at the appropriate times when hypothesis
is enabled for a test case.
@ -113,7 +120,7 @@ class EdenTestCase(TestParent):
else:
return super(EdenTestCase, self).run(report)
def report_time(self, event):
def report_time(self, event: str) -> None:
'''
report_time() is a helper function for logging how long different
parts of the test took.
@ -128,7 +135,7 @@ class EdenTestCase(TestParent):
event, since_start, since_last)
self.last_event = now
def setUp(self):
def setUp(self) -> None:
self.start = time.time()
self.last_event = self.start
@ -141,8 +148,8 @@ class EdenTestCase(TestParent):
self.addCleanup(self.report_time, 'clean up started')
def setup_eden_test(self):
def cleanup_tmp_dir():
def setup_eden_test(self) -> None:
def cleanup_tmp_dir() -> None:
if os.environ.get('EDEN_TEST_NO_CLEANUP'):
print('Leaving behind eden test directory %r' % self.tmp_dir)
else:
@ -197,7 +204,9 @@ class EdenTestCase(TestParent):
self.addCleanup(self.eden.cleanup)
self.report_time('eden daemon started')
def get_thrift_client(self):
self.mount = os.path.join(self.mounts_dir, 'main')
def get_thrift_client(self) -> eden.thrift.EdenClient:
'''
Get a thrift client to the edenfs daemon.
'''
@ -223,7 +232,9 @@ class EdenTestCase(TestParent):
'''
return None
def create_repo(self, name, repo_class, **kwargs):
def create_repo(
self, name: str, repo_class: Type[repobase.Repository], **kwargs: Any
) -> repobase.Repository:
'''
Create a new repository.
@ -287,7 +298,7 @@ class EdenTestCase(TestParent):
'''Unlink the file at the specified path relative to the clone.'''
os.unlink(self.get_path(path))
def select_storage_engine(self):
def select_storage_engine(self) -> str:
'''
Prefer to use memory in the integration tests, but allow
the tests that restart to override this and pick something else.
@ -305,12 +316,10 @@ class EdenRepoTest(EdenTestCase):
when subclassing from EdenRepoTest. @eden_repo_test will automatically run
your tests once per supported repository type.
'''
def setup_eden_test(self):
def setup_eden_test(self) -> None:
super().setup_eden_test()
self.repo_name = 'main'
self.mount = os.path.join(self.mounts_dir, self.repo_name)
self.repo = self.create_repo(self.repo_name, self.get_repo_class())
self.populate_repo()
self.report_time('repository setup done')
@ -319,18 +328,24 @@ class EdenRepoTest(EdenTestCase):
self.eden.clone(self.repo_name, self.mount)
self.report_time('eden clone done')
def populate_repo(self):
def populate_repo(self) -> None:
raise NotImplementedError('individual test classes must implement '
'populate_repo()')
def get_repo_class(self):
def get_repo_class(self) -> Type[repobase.Repository]:
raise NotImplementedError('test subclasses must implement '
'get_repo_class(). This is normally '
'implemented automatically by '
'@eden_repo_test')
def _replicate_test(caller_scope, replicate, test_class, args, kwargs):
def _replicate_test(
caller_scope: Dict[str, Any],
replicate: Callable[..., Iterable[Tuple[str, Type[EdenRepoTest]]]],
test_class: Type[EdenRepoTest],
args: Sequence[Any],
kwargs: Dict[str, Any]
) -> None:
for suffix, new_class in replicate(test_class, *args, **kwargs):
# Set the name and module information on our new subclass
name = test_class.__name__ + suffix
@ -342,7 +357,9 @@ def _replicate_test(caller_scope, replicate, test_class, args, kwargs):
caller_scope[name] = new_class
def test_replicator(replicate):
def test_replicator(
replicate: Callable[..., Iterable[Tuple[str, Type[EdenRepoTest]]]]
) -> Callable[..., Any]:
'''
A helper function for implementing decorators that replicate TestCase
classes so that the same test function can be run multiple times with
@ -350,20 +367,27 @@ def test_replicator(replicate):
See the @eden_repo_test decorator for an example of how this is used.
'''
def decorator(*args, **kwargs):
def decorator(
*args: Any,
**kwargs: Any
) -> Optional[Callable[[Type[EdenRepoTest]], None]]:
# We do some rather hacky things here to define new test class types
# in our caller's scope. This is needed so that the unittest TestLoader
# will find the subclasses we define.
caller_scope = inspect.currentframe().f_back.f_locals
current_frame = inspect.currentframe()
if current_frame is None:
raise Exception('we require a python interpreter with '
'stack frame support')
caller_scope = current_frame.f_back.f_locals
if len(args) == 1 and not kwargs and isinstance(args[0], type):
# The decorator was invoked directly with the test class,
# with no arguments or keyword arguments
_replicate_test(caller_scope, replicate, args[0],
args=(), kwargs={})
return
return None
else:
def inner_decorator(test_class):
def inner_decorator(test_class: Type[EdenRepoTest]) -> None:
_replicate_test(caller_scope, replicate, test_class,
args, kwargs)
return inner_decorator
@ -371,7 +395,9 @@ def test_replicator(replicate):
return decorator
def _replicate_eden_repo_test(test_class):
def _replicate_eden_repo_test(
test_class: Type[EdenRepoTest]
) -> Iterable[Tuple[str, Type[EdenRepoTest]]]:
repo_types = [
(hgrepo.HgRepository, 'Hg'),
(gitrepo.GitRepository, 'Git'),
@ -381,7 +407,7 @@ def _replicate_eden_repo_test(test_class):
# Define a new class that derives from the input class
# as well as the repo-specific parent class type
class RepoSpecificTest(test_class):
def get_repo_class(self):
def get_repo_class(self) -> Type[repobase.Repository]:
return repo_class
yield suffix, RepoSpecificTest

View File

@ -10,6 +10,8 @@
import errno
import os
import stat
from typing import Dict
from .lib import testcase
from facebook.eden import EdenService
from facebook.eden.ttypes import FileInformationOrError
@ -21,7 +23,7 @@ INITIAL_SEQ = 5
class MaterializedQueryTest(testcase.EdenRepoTest):
'''Check that materialization is represented correctly.'''
def populate_repo(self):
def populate_repo(self) -> None:
self.repo.write_file('hello', 'hola\n')
self.repo.write_file('adir/file', 'foo!\n')
self.repo.write_file('bdir/test.sh', '#!/bin/bash\necho test\n',
@ -30,16 +32,16 @@ class MaterializedQueryTest(testcase.EdenRepoTest):
self.repo.symlink('slink', 'hello')
self.repo.commit('Initial commit.')
def edenfs_logging_settings(self):
def edenfs_logging_settings(self) -> Dict[str, str]:
return {'eden.fs.fuse.RequestData': 'DBG5'}
def setUp(self):
def setUp(self) -> None:
super().setUp()
self.client = self.get_thrift_client()
self.client.open()
self.addCleanup(self.client.close)
def test_noEntries(self):
def test_noEntries(self) -> None:
pos = self.client.getCurrentJournalPosition(self.mount)
# setting up the .eden dir bumps the sequence number
self.assertEqual(INITIAL_SEQ, pos.sequenceNumber)
@ -52,7 +54,7 @@ class MaterializedQueryTest(testcase.EdenRepoTest):
self.assertEqual(pos, changed.fromPosition)
self.assertEqual(pos, changed.toPosition)
def test_getFileInformation(self):
def test_getFileInformation(self) -> None:
""" verify that getFileInformation is consistent with the VFS """
paths = ['', 'not-exist', 'hello', 'adir',
@ -84,7 +86,7 @@ class MaterializedQueryTest(testcase.EdenRepoTest):
self.assertEqual(e.errno, err.errorCode,
msg='error code matches for ' + path)
def test_invalidProcessGeneration(self):
def test_invalidProcessGeneration(self) -> None:
# Get a candidate position
pos = self.client.getCurrentJournalPosition(self.mount)
@ -96,7 +98,7 @@ class MaterializedQueryTest(testcase.EdenRepoTest):
self.assertEqual(errno.ERANGE, context.exception.errorCode,
msg='Must return ERANGE')
def test_removeFile(self):
def test_removeFile(self) -> None:
initial_pos = self.client.getCurrentJournalPosition(self.mount)
self.assertEqual(INITIAL_SEQ, initial_pos.sequenceNumber)
@ -106,7 +108,7 @@ class MaterializedQueryTest(testcase.EdenRepoTest):
self.assertEqual(set(), set(changed.changedPaths))
self.assertEqual(set(['adir/file']), set(changed.removedPaths))
def test_renameFile(self):
def test_renameFile(self) -> None:
initial_pos = self.client.getCurrentJournalPosition(self.mount)
self.assertEqual(INITIAL_SEQ, initial_pos.sequenceNumber)
@ -116,7 +118,7 @@ class MaterializedQueryTest(testcase.EdenRepoTest):
self.assertEqual(set(), set(changed.changedPaths))
self.assertEqual(set(['hello']), set(changed.removedPaths))
def test_addFile(self):
def test_addFile(self) -> None:
initial_pos = self.client.getCurrentJournalPosition(self.mount)
self.assertEqual(INITIAL_SEQ, initial_pos.sequenceNumber)

View File

@ -30,12 +30,12 @@ libc.munmap.argtypes = [c_void_p, c_size_t]
class MmapTest(testcase.EdenRepoTest):
contents = 'abcdef'
def populate_repo(self):
def populate_repo(self) -> None:
self.repo.write_file('filename', self.contents)
self.repo.commit('Initial commit.')
self.filename = os.path.join(self.mount, 'filename')
def test_mmap_in_backed_file_is_null_terminated(self):
def test_mmap_in_backed_file_is_null_terminated(self) -> None:
fd = os.open(self.filename, os.O_RDONLY)
try:
size = os.fstat(fd).st_size
@ -44,7 +44,9 @@ class MmapTest(testcase.EdenRepoTest):
map_size = (size + 4095) // 4096 * 4096
self.assertNotEqual(size, map_size)
m = libc.mmap(None, map_size, mmap.PROT_READ, mmap.MAP_PRIVATE, fd, 0)
m = libc.mmap(
None, map_size, mmap.PROT_READ, mmap.MAP_PRIVATE, fd, 0
)
try:
# assert the additional mapped bytes are null, per `man 2 mmap`
for i in range(size, map_size):
@ -54,7 +56,9 @@ class MmapTest(testcase.EdenRepoTest):
finally:
os.close(fd)
def test_mmap_is_null_terminated_after_truncate_and_write_to_overlay(self):
def test_mmap_is_null_terminated_after_truncate_and_write_to_overlay(
self
) -> None:
# WARNING: This test is very fiddly.
# The bug is that if a file in Eden is opened with O_TRUNC followed by
@ -96,7 +100,9 @@ class MmapTest(testcase.EdenRepoTest):
map_size = (size + 4095) // 4096 * 4096
self.assertNotEqual(size, map_size)
m = libc.mmap(None, map_size, mmap.PROT_READ, mmap.MAP_PRIVATE, fd, 0)
m = libc.mmap(
None, map_size, mmap.PROT_READ, mmap.MAP_PRIVATE, fd, 0
)
try:
# Assert the additional mapped bytes are null, per `man 2 mmap`
for i in range(size, map_size):

View File

@ -14,11 +14,11 @@ import os
@testcase.eden_repo_test
class OpenExclusiveTest(testcase.EdenRepoTest):
def populate_repo(self):
def populate_repo(self) -> None:
self.repo.write_file('readme.txt', 'test\n')
self.repo.commit('Initial commit.')
def test_oexcl(self):
def test_oexcl(self) -> None:
filename = os.path.join(self.mount, 'makeme')
fd = os.open(filename, os.O_EXCL | os.O_CREAT | os.O_RDWR)

View File

@ -7,22 +7,26 @@
# LICENSE file in the root directory of this source tree. An additional grant
# of patent rights can be found in the PATENTS file in the same directory.
from .lib import testcase
import os
import subprocess
from typing import Dict
from .lib import testcase
@testcase.eden_repo_test
class PatchTest(testcase.EdenRepoTest):
def populate_repo(self):
def populate_repo(self) -> None:
self.repo.write_file('hello', 'hola\n')
self.repo.commit('Initial commit.')
def edenfs_logging_settings(self):
def edenfs_logging_settings(self) -> Dict[str, str]:
return {'eden.strace': 'DBG7', 'eden.fs.fuse': 'DBG7'}
def test_patch(self):
proc = subprocess.Popen(['patch'], cwd=self.mount, stdin=subprocess.PIPE)
def test_patch(self) -> None:
proc = subprocess.Popen(
['patch'], cwd=self.mount, stdin=subprocess.PIPE
)
stdout, stderr = proc.communicate(b'''
--- hello
+++ hello

View File

@ -9,22 +9,23 @@
import os
import unittest
from typing import Dict
from .lib import testcase
@testcase.eden_repo_test
class PersistenceTest(testcase.EdenRepoTest):
def populate_repo(self):
def populate_repo(self) -> None:
self.repo.write_file('file_in_root', 'contents1')
self.repo.write_file('subdir/file_in_subdir', 'contents2')
self.repo.commit('Initial commit.')
def edenfs_logging_settings(self):
def edenfs_logging_settings(self) -> Dict[str, str]:
return {'eden.strace': 'DBG7', 'eden.fs.fuse': 'DBG7'}
@unittest.skip('TODO: this is not fully implemented yet')
def test_preserves_inode_numbers_and_timestamps_for_nonmaterialized_inodes_across_restarts(self):
def test_preserves_nonmaterialized_inode_numbers(self) -> None:
inode_paths = [
'file_in_root',
'subdir', # we care about trees too
@ -42,7 +43,8 @@ class PersistenceTest(testcase.EdenRepoTest):
os.lstat(os.path.join(self.mount, path))
for path in inode_paths]
for (path, old_stat, new_stat) in zip(inode_paths, old_stats, new_stats):
for (path, old_stat,
new_stat) in zip(inode_paths, old_stats, new_stats):
self.assertEqual(old_stat.st_ino, new_stat.st_ino,
f"inode numbers must line up for path {path}")
self.assertEqual(old_stat.st_atime, new_stat.st_atime,

View File

@ -16,11 +16,11 @@ from eden.cli import util
@testcase.eden_repo_test
class RCTest(testcase.EdenRepoTest):
def populate_repo(self):
def populate_repo(self) -> None:
self.repo.write_file('readme.txt', 'test\n')
self.repo.commit('Initial commit.')
def test_eden_list(self):
def test_eden_list(self) -> None:
mounts = self.eden.list_cmd()
self.assertEqual({self.mount: self.eden.CLIENT_ACTIVE}, mounts)
@ -33,7 +33,7 @@ class RCTest(testcase.EdenRepoTest):
mounts = self.eden.list_cmd()
self.assertEqual({self.mount: self.eden.CLIENT_ACTIVE}, mounts)
def test_unmount_rmdir(self):
def test_unmount_rmdir(self) -> None:
clients = os.path.join(self.eden_dir, 'clients')
client_names = os.listdir(clients)
self.assertEqual(1, len(client_names),
@ -60,7 +60,7 @@ class RCTest(testcase.EdenRepoTest):
self.assertEqual({self.mount: self.eden.CLIENT_ACTIVE}, mounts,
msg='The client directory should have been restored')
def test_override_system_config(self):
def test_override_system_config(self) -> None:
system_repo = self.create_repo('system_repo', self.get_repo_class())
system_repo.write_file('hello.txt', 'hola\n')
@ -68,6 +68,7 @@ class RCTest(testcase.EdenRepoTest):
system_repo_path, system_repo_type = \
util.get_repo_source_and_type(system_repo.path)
assert system_repo_type is not None
# Create temporary system config
f, path = tempfile.mkstemp(dir=self.system_config_dir)
@ -93,8 +94,8 @@ type = ''' + system_repo_type + '''
st = os.lstat(hello)
self.assertTrue(stat.S_ISREG(st.st_mode))
with open(hello, 'r') as f:
self.assertEqual('test\n', f.read())
with open(hello, 'r') as hello_file:
self.assertEqual('test\n', hello_file.read())
# Add system_repo to system config file with new name
repo_name = 'repo'
@ -119,5 +120,5 @@ type = ''' + system_repo_type + '''
st = os.lstat(hello)
self.assertTrue(stat.S_ISREG(st.st_mode))
with open(hello, 'r') as f:
self.assertEqual('hola\n', f.read())
with open(hello, 'r') as hello_file:
self.assertEqual('hola\n', hello_file.read())

View File

@ -14,17 +14,17 @@ from .lib import edenclient, gitrepo, hgrepo, testcase
@testcase.eden_repo_test
class RemountTest(testcase.EdenRepoTest):
def populate_repo(self):
def populate_repo(self) -> None:
self.repo.write_file('hello', 'hola\n')
self.repo.write_file('adir/file', 'foo!\n')
self.repo.symlink('slink', 'hello')
self.repo.commit('Initial commit.')
def select_storage_engine(self):
def select_storage_engine(self) -> str:
''' we need to persist data across restarts '''
return 'sqlite'
def test_remount_basic(self):
def test_remount_basic(self) -> None:
# Mount multiple clients
for i in range(5):
self.eden.clone(self.repo_name, self.mount + '-' + str(i))
@ -41,7 +41,7 @@ class RemountTest(testcase.EdenRepoTest):
entries = sorted(os.listdir(self.mount))
self.assertEqual(['.eden', 'adir', 'hello', 'slink'], entries)
def test_git_and_hg(self):
def test_git_and_hg(self) -> None:
# Create git and hg repositories for mounting
repo_names = {'git': 'git_repo', 'hg': 'hg_repo'}
git_repo = self.create_repo(repo_names['git'], gitrepo.GitRepository)
@ -76,7 +76,7 @@ class RemountTest(testcase.EdenRepoTest):
with open(hello, 'r') as f:
self.assertEqual('hola\n', f.read())
def test_partial_unmount(self):
def test_partial_unmount(self) -> None:
# Mount multiple clients
for i in range(5):
self.eden.clone(self.repo_name, self.mount + '-' + str(i))
@ -96,7 +96,7 @@ class RemountTest(testcase.EdenRepoTest):
# Verify that unmounted client is not remounted
self.assertFalse(os.path.exists(self.mount))
def test_restart_twice(self):
def test_restart_twice(self) -> None:
# Mount multiple clients
for i in range(5):
self.eden.clone(self.repo_name, self.mount + '-' + str(i))
@ -122,7 +122,7 @@ class RemountTest(testcase.EdenRepoTest):
self.assertFalse(os.path.exists(self.mount))
self.assertFalse(os.path.exists(self.mount + "3"))
def test_try_remount_existing_mount(self):
def test_try_remount_existing_mount(self) -> None:
'''Verify trying to mount an existing mount prints a sensible error.'''
mount_destination = self.mount + '-0'
self.eden.clone(self.repo_name, mount_destination)
@ -134,7 +134,7 @@ class RemountTest(testcase.EdenRepoTest):
context.exception.stderr)
self.assertEqual(1, context.exception.returncode)
def test_empty_config_json(self):
def test_empty_config_json(self) -> None:
for i in range(5):
self.eden.clone(self.repo_name, self.mount + '-' + str(i))
@ -152,7 +152,7 @@ class RemountTest(testcase.EdenRepoTest):
entries = sorted(os.listdir(self.mount))
self.assertEqual([], entries)
def test_deleted_config_json(self):
def test_deleted_config_json(self) -> None:
for i in range(5):
self.eden.clone(self.repo_name, self.mount + '-' + str(i))
@ -170,7 +170,7 @@ class RemountTest(testcase.EdenRepoTest):
entries = sorted(os.listdir(self.mount))
self.assertEqual([], entries)
def test_incorrect_config_json(self):
def test_incorrect_config_json(self) -> None:
for i in range(5):
self.eden.clone(self.repo_name, self.mount + '-' + str(i))
@ -197,7 +197,7 @@ class RemountTest(testcase.EdenRepoTest):
for incorrect_mount in config_data:
self.assertFalse(os.path.isdir(incorrect_mount))
def test_bad_config_json(self):
def test_bad_config_json(self) -> None:
for i in range(5):
self.eden.clone(self.repo_name, self.mount + '-' + str(i))

View File

@ -14,13 +14,13 @@ import os
@testcase.eden_repo_test
class RenameTest(testcase.EdenRepoTest):
def populate_repo(self):
def populate_repo(self) -> None:
self.repo.write_file('hello', 'hola\n')
self.repo.write_file('adir/file', 'foo!\n')
self.repo.symlink('slink', 'hello')
self.repo.commit('Initial commit.')
def test_rename_errors(self):
def test_rename_errors(self) -> None:
''' Test some error cases '''
with self.assertRaises(OSError) as context:
os.rename(os.path.join(self.mount, 'not-exist'),
@ -28,7 +28,7 @@ class RenameTest(testcase.EdenRepoTest):
self.assertEqual(errno.ENOENT, context.exception.errno,
msg='Renaming a bogus file -> ENOENT')
def test_rename_dir(self):
def test_rename_dir(self) -> None:
filename = os.path.join(self.mount, 'adir')
targetname = os.path.join(self.mount, 'a-new-target')
os.rename(filename, targetname)
@ -44,7 +44,7 @@ class RenameTest(testcase.EdenRepoTest):
with open(targetfile, 'r') as f:
self.assertEqual('foo!\n', f.read())
def test_rename_away_tree_entry(self):
def test_rename_away_tree_entry(self) -> None:
''' Rename a tree entry away and back again '''
# We should be able to rename files that are in the Tree
hello = os.path.join(self.mount, 'hello')
@ -78,7 +78,7 @@ class RenameTest(testcase.EdenRepoTest):
f.seek(0)
self.assertEqual('hola\nwoot', f.read())
def test_rename_overlay_only(self):
def test_rename_overlay_only(self) -> None:
''' Create a local/overlay only file and rename it '''
# We should be able to rename files that are in the Tree
from_name = os.path.join(self.mount, 'overlay-a')
@ -95,7 +95,9 @@ class RenameTest(testcase.EdenRepoTest):
msg='no longer visible as old name')
entries = sorted(os.listdir(self.mount))
self.assertEqual(['.eden', 'adir', 'hello', 'overlay-b', 'slink'], entries)
self.assertEqual(
['.eden', 'adir', 'hello', 'overlay-b', 'slink'], entries
)
with open(to_name, 'r') as f:
self.assertEqual('overlay-a\n', f.read(),
@ -107,7 +109,9 @@ class RenameTest(testcase.EdenRepoTest):
os.rename(to_name, from_name)
entries = sorted(os.listdir(self.mount))
self.assertEqual(['.eden', 'adir', 'hello', 'overlay-a', 'slink'], entries)
self.assertEqual(
['.eden', 'adir', 'hello', 'overlay-a', 'slink'], entries
)
with open(from_name, 'r+') as write_f:
write_f.seek(0, os.SEEK_END)
@ -116,7 +120,7 @@ class RenameTest(testcase.EdenRepoTest):
f.seek(0)
self.assertEqual('overlay-a\nwoot', f.read())
def test_rename_overlay_over_tree(self):
def test_rename_overlay_over_tree(self) -> None:
''' Make an overlay file and overwrite a tree entry with it '''
from_name = os.path.join(self.mount, 'overlay-a')
@ -139,7 +143,7 @@ class RenameTest(testcase.EdenRepoTest):
self.assertEqual('overlay-a\n', f.read(),
msg='holds correct data')
def test_rename_between_different_dirs(self):
def test_rename_between_different_dirs(self) -> None:
adir = os.path.join(self.mount, 'adir')
bdir = os.path.join(self.mount, 'bdir')
os.mkdir(bdir)

View File

@ -1,3 +1,5 @@
#!/usr/bin/env python3
#
# Copyright (c) 2016-present, Facebook, Inc.
# All rights reserved.
#
@ -5,12 +7,9 @@
# LICENSE file in the root directory of this source tree. An additional grant
# of patent rights can be found in the PATENTS file in the same directory.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import os
from typing import List
from .lib import gitrepo
from .lib import hgrepo
from .lib import testcase
@ -24,7 +23,7 @@ class RepoTest(testcase.EdenTestCase):
actually need to run separately with git and mercurial repositories. These
tests don't actually mount anything in eden at all.
'''
def test_list_repository(self):
def test_list_repository(self) -> None:
self.assertEqual([], self._list_repos())
config = '''\
@ -53,7 +52,7 @@ type = hg
expected = ['fbsource', 'git', 'hg-crew']
self.assertEqual(expected, self._list_repos())
def test_add_multiple(self):
def test_add_multiple(self) -> None:
hg_repo = self.create_repo('hg_repo', hgrepo.HgRepository)
git_repo = self.create_repo('git_repo', gitrepo.GitRepository)
@ -64,5 +63,5 @@ type = hg
self.eden.add_repository('git1', git_repo.path)
self.assertEqual(['git1', 'hg1', 'hg2'], self._list_repos())
def _list_repos(self):
def _list_repos(self) -> List[str]:
return self.eden.repository_cmd().splitlines()

View File

@ -15,11 +15,11 @@ import subprocess
@testcase.eden_repo_test
class SedTest(testcase.EdenRepoTest):
def populate_repo(self):
def populate_repo(self) -> None:
self.repo.write_file('hello', 'hola\n')
self.repo.commit('Initial commit.')
def test_sed(self):
def test_sed(self) -> None:
filename = os.path.join(self.mount, 'sedme')
with open(filename, 'w') as f:

View File

@ -17,12 +17,12 @@ import time
@testcase.eden_repo_test
class SetAttrTest(testcase.EdenRepoTest):
def populate_repo(self):
def populate_repo(self) -> None:
self.repo.write_file('hello', 'hola\n')
self.repo.commit('Initial commit.')
# mtime should not get changed on permission changes
def test_chmod(self):
def test_chmod(self) -> None:
filename = os.path.join(self.mount, 'hello')
st = os.lstat(filename)
@ -35,7 +35,7 @@ class SetAttrTest(testcase.EdenRepoTest):
self.assertGreaterEqual(new_st.st_ctime, st.st_ctime)
self.assertEqual(new_st.st_mode, st.st_mode | stat.S_IROTH)
def test_chown(self):
def test_chown(self) -> None:
filename = os.path.join(self.mount, 'hello')
# Chown should fail with EPERM unless we are setting it
@ -60,7 +60,7 @@ class SetAttrTest(testcase.EdenRepoTest):
msg="changing gid of a file should raise EPERM/EACCES"
)
def test_truncate(self):
def test_truncate(self) -> None:
filename = os.path.join(self.mount, 'hello')
st = os.lstat(filename)
@ -74,7 +74,7 @@ class SetAttrTest(testcase.EdenRepoTest):
self.assertGreaterEqual(new_st.st_ctime, st.st_ctime)
self.assertGreaterEqual(new_st.st_mtime, st.st_mtime)
def test_utime(self):
def test_utime(self) -> None:
filename = os.path.join(self.mount, 'hello')
# Update the atime and mtime to a time 5 seconds in the past.
@ -90,7 +90,7 @@ class SetAttrTest(testcase.EdenRepoTest):
self.assertEqual(st.st_atime, timestamp)
self.assertEqual(st.st_mtime, timestamp)
def test_touch(self):
def test_touch(self) -> None:
filename = os.path.join(self.mount, 'hello')
now = time.time()
@ -108,7 +108,7 @@ class SetAttrTest(testcase.EdenRepoTest):
self.assertGreaterEqual(st.st_atime, now)
self.assertGreaterEqual(st.st_mtime, now)
def test_umask(self):
def test_umask(self) -> None:
original_umask = os.umask(0o177)
try:
filename = os.path.join(self.mount, 'test1')
@ -123,7 +123,7 @@ class SetAttrTest(testcase.EdenRepoTest):
finally:
os.umask(original_umask)
def test_dir_addfile(self):
def test_dir_addfile(self) -> None:
dirname = os.path.join(self.mount, 'test_dir')
self.mkdir('test_dir')
@ -135,7 +135,7 @@ class SetAttrTest(testcase.EdenRepoTest):
self.assertGreaterEqual(new_st.st_ctime, st.st_ctime)
self.assertGreaterEqual(new_st.st_mtime, st.st_mtime)
def test_dir_delfile(self):
def test_dir_delfile(self) -> None:
dirname = os.path.join(self.mount, 'test_dir')
self.mkdir('test_dir')
self.write_file('test_file', 'test string')
@ -148,7 +148,7 @@ class SetAttrTest(testcase.EdenRepoTest):
self.assertGreaterEqual(new_st.st_ctime, st.st_ctime)
self.assertGreaterEqual(new_st.st_mtime, st.st_mtime)
def test_dir_change_filecontents(self):
def test_dir_change_filecontents(self) -> None:
dirname = os.path.join(self.mount, 'test_dir')
self.mkdir('test_dir')
@ -163,7 +163,7 @@ class SetAttrTest(testcase.EdenRepoTest):
# Changing permisssions of directory should change
# only ctime of the directory, but not mtime and atime.
def test_dir_change_perm(self):
def test_dir_change_perm(self) -> None:
dirname = os.path.join(self.mount, 'test_dir')
self.mkdir('test_dir')
@ -177,7 +177,7 @@ class SetAttrTest(testcase.EdenRepoTest):
# Read call on a file in Edenfs should modify the atime of the file.
# Also, open call should not change the timeStamps of a file.
def test_timestamp_openfiles(self):
def test_timestamp_openfiles(self) -> None:
filename = os.path.join(self.mount, 'hello')
st = os.lstat(filename)
with open(filename, 'r') as f:

View File

@ -11,13 +11,14 @@ import os
import resource
import sys
import threading
from typing import Dict
from .lib import testcase
@testcase.eden_repo_test
class TakeoverTest(testcase.EdenRepoTest):
def populate_repo(self):
def populate_repo(self) -> None:
self.pagesize = resource.getpagesize()
self.page1 = "1" * self.pagesize
self.page2 = "2" * self.pagesize
@ -31,18 +32,18 @@ class TakeoverTest(testcase.EdenRepoTest):
self.repo.write_file('src/test/test2.py', 'test2')
self.commit2 = self.repo.commit('Initial commit.')
def select_storage_engine(self):
def select_storage_engine(self) -> str:
''' we need to persist data across restarts '''
return 'sqlite'
def edenfs_logging_settings(self):
def edenfs_logging_settings(self) -> Dict[str, str]:
if self._testMethodName == 'test_takeover_with_io':
# test_takeover_with_io causes lots of I/O, so do not enable
# verbose logging of I/O operations in this test.
return {}
return {'eden.strace': 'DBG7', 'eden.fs.fuse': 'DBG7'}
def do_takeover_test(self):
def do_takeover_test(self) -> None:
hello = os.path.join(self.mount, 'tree/hello')
deleted = os.path.join(self.mount, 'tree/deleted')
deleted_local = os.path.join(self.mount, 'deleted-local')
@ -126,10 +127,10 @@ class TakeoverTest(testcase.EdenRepoTest):
self.assertEqual(self.page1, f.read(self.pagesize))
self.assertEqual(self.page2, f.read(self.pagesize))
def test_takeover(self):
def test_takeover(self) -> None:
return self.do_takeover_test()
def test_takeover_after_diff_revisions(self):
def test_takeover_after_diff_revisions(self) -> None:
# Make a getScmStatusBetweenRevisions() call to Eden.
# Previously this thrift call caused Eden to create temporary inode
# objects outside of the normal root inode tree, and this would cause
@ -141,7 +142,7 @@ class TakeoverTest(testcase.EdenRepoTest):
return self.do_takeover_test()
def test_takeover_with_io(self):
def test_takeover_with_io(self) -> None:
num_threads = 4
write_chunk_size = 1024 * 1024
max_file_length = write_chunk_size * 100
@ -155,7 +156,7 @@ class TakeoverTest(testcase.EdenRepoTest):
stop = threading.Event()
bufs = [b'x' * write_chunk_size, b'y' * write_chunk_size]
def do_io(thread_id, running_event):
def do_io(thread_id: int, running_event: threading.Event) -> None:
path = os.path.join(
self.mount, 'src', 'test', 'data%d.log' % thread_id
)
@ -214,7 +215,7 @@ class TakeoverTest(testcase.EdenRepoTest):
def test_takeover_preserves_inode_numbers_for_open_nonmaterialized_files(
self
):
) -> None:
hello = os.path.join(self.mount, 'tree/hello')
fd = os.open(hello, os.O_RDONLY)

View File

@ -19,7 +19,7 @@ from .lib import testcase
@testcase.eden_repo_test
class ThriftTest(testcase.EdenRepoTest):
def populate_repo(self):
def populate_repo(self) -> None:
self.repo.write_file('hello', 'hola\n')
self.repo.write_file('README', 'docs\n')
self.repo.write_file('adir/file', 'foo!\n')
@ -32,32 +32,34 @@ class ThriftTest(testcase.EdenRepoTest):
self.repo.remove_file('README')
self.commit2 = self.repo.commit('Commit 2.')
def setUp(self):
def setUp(self) -> None:
super().setUp()
self.client = self.get_thrift_client()
self.client.open()
self.addCleanup(self.client.close)
def get_loaded_inodes_count(self, path):
def get_loaded_inodes_count(self, path: str) -> int:
result = self.client.debugInodeStatus(self.mount, path)
inode_count = 0
for item in result:
assert item.entries is not None
for inode in item.entries:
if inode.loaded:
inode_count += 1
return inode_count
def test_list_mounts(self):
def test_list_mounts(self) -> None:
mounts = self.client.listMounts()
self.assertEqual(1, len(mounts))
mount = mounts[0]
self.assertEqual(self.mount, mount.mountPoint)
assert mount.edenClientPath is not None
# The client path should always be inside the main eden directory
self.assertTrue(mount.edenClientPath.startswith(self.eden.eden_dir),
msg='unexpected client path: %r' % mount.edenClientPath)
def test_get_sha1(self):
def test_get_sha1(self) -> None:
expected_sha1_for_hello = hashlib.sha1(b'hola\n').digest()
result_for_hello = SHA1Result()
result_for_hello.set_sha1(expected_sha1_for_hello)
@ -73,30 +75,30 @@ class ThriftTest(testcase.EdenRepoTest):
], self.client.getSHA1(self.mount, ['hello', 'adir/file'])
)
def test_get_sha1_throws_for_empty_string(self):
def test_get_sha1_throws_for_empty_string(self) -> None:
results = self.client.getSHA1(self.mount, [''])
self.assertEqual(1, len(results))
self.assert_error(results[0], 'path cannot be the empty string')
def test_get_sha1_throws_for_directory(self):
def test_get_sha1_throws_for_directory(self) -> None:
results = self.client.getSHA1(self.mount, ['adir'])
self.assertEqual(1, len(results))
self.assert_error(results[0], 'adir: Is a directory')
def test_get_sha1_throws_for_non_existent_file(self):
def test_get_sha1_throws_for_non_existent_file(self) -> None:
results = self.client.getSHA1(self.mount, ['i_do_not_exist'])
self.assertEqual(1, len(results))
self.assert_error(results[0],
'i_do_not_exist: No such file or directory')
def test_get_sha1_throws_for_symlink(self):
def test_get_sha1_throws_for_symlink(self) -> None:
'''Fails because caller should resolve the symlink themselves.'''
results = self.client.getSHA1(self.mount, ['slink'])
self.assertEqual(1, len(results))
self.assert_error(results[0],
'slink: file is a symlink: Invalid argument')
def assert_error(self, sha1result, error_message):
def assert_error(self, sha1result: SHA1Result, error_message: str) -> None:
self.assertIsNotNone(sha1result, msg='Must pass a SHA1Result')
self.assertEqual(
SHA1Result.ERROR,
@ -107,7 +109,7 @@ class ThriftTest(testcase.EdenRepoTest):
self.assertIsNotNone(error)
self.assertEqual(error_message, error.message)
def test_glob(self):
def test_glob(self) -> None:
self.assertEqual(
['adir/file'], self.client.glob(self.mount, ['a*/file']))
self.assertCountEqual(
@ -139,7 +141,7 @@ class ThriftTest(testcase.EdenRepoTest):
self.assertIn('unterminated bracket sequence',
str(ctx.exception))
def test_unload_free_inodes(self):
def test_unload_free_inodes(self) -> None:
for i in range(100):
self.write_file('testfile%d.txt' % i, 'unload test case')
@ -159,17 +161,13 @@ class ThriftTest(testcase.EdenRepoTest):
'Number of loaded inodes should reduce after unload'
)
def read_file(self, filename):
with open(filename, 'r') as f:
f.read()
def get_counter(self, name):
def get_counter(self, name: str) -> int:
self.client.flushStatsNow()
return self.client.getCounters()[name]
def test_invalidate_inode_cache(self):
filename = os.path.join(self.mount, 'bdir/file')
dirname = os.path.join(self.mount, 'bdir/')
def test_invalidate_inode_cache(self) -> None:
filename = 'bdir/file'
full_dirname = os.path.join(self.mount, 'bdir/')
# Exercise eden a bit to make sure counters are ready
for _ in range(20):
@ -192,16 +190,16 @@ class ThriftTest(testcase.EdenRepoTest):
lookups = self.get_counter("fuse.lookup_us.count")
# -hl makes ls to do a lookup of the file to determine type
os.system("ls -hl " + dirname + " > /dev/null")
os.system("ls -hl " + full_dirname + " > /dev/null")
lookups_1ls = self.get_counter("fuse.lookup_us.count")
# equal, the file was lookup'ed above.
self.assertEqual(lookups, lookups_1ls)
self.client.invalidateKernelInodeCache(self.mount, 'bdir')
os.system("ls -hl " + dirname + " > /dev/null")
os.system("ls -hl " + full_dirname + " > /dev/null")
lookups_2ls = self.get_counter("fuse.lookup_us.count")
self.assertEqual(lookups_1ls + 1, lookups_2ls)
def test_diff_revisions(self):
def test_diff_revisions(self) -> None:
# Convert the commit hashes to binary for the thrift call
with self.get_thrift_client() as client:
diff = client.getScmStatusBetweenRevisions(
@ -219,7 +217,7 @@ class ThriftTest(testcase.EdenRepoTest):
}
)
def test_diff_revisions_hex(self):
def test_diff_revisions_hex(self) -> None:
# Watchman currently clals getScmStatusBetweenRevisions()
# with 40-byte hexadecimal commit IDs, so make sure that works.
with self.get_thrift_client() as client:

View File

@ -19,18 +19,18 @@ PAYLOAD = b'W00t\n'
@testcase.eden_repo_test
class UnixSocketTest(testcase.EdenRepoTest):
def populate_repo(self):
def populate_repo(self) -> None:
self.repo.write_file('hello', 'hola\n')
self.repo.commit('Initial commit.')
def test_unixsock(self):
def test_unixsock(self) -> None:
'''Create a UNIX domain socket in EdenFS and verify that a client
can connect and write, and that the server side can accept
and read data from it.'''
filename = os.path.join(self.mount, 'example.sock')
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.setblocking(0) # ensure that we don't block unexpectedly
sock.setblocking(False) # ensure that we don't block unexpectedly
try:
sock.bind(filename)
st = os.lstat(filename)
@ -38,10 +38,10 @@ class UnixSocketTest(testcase.EdenRepoTest):
sock.listen(1)
class Client(threading.Thread):
def run(self):
def run(self) -> None:
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
s.setblocking(0) # don't block here either
s.setblocking(False) # don't block here either
s.connect(filename)
s.send(PAYLOAD)
finally:
@ -53,7 +53,9 @@ class UnixSocketTest(testcase.EdenRepoTest):
client_thread.start()
readable, _, _ = select.select([sock], [], [], 2)
self.assertTrue(sock in readable, msg='sock should be ready for accept')
self.assertTrue(
sock in readable, msg='sock should be ready for accept'
)
conn, addr = sock.accept()
data = conn.recv(len(PAYLOAD))
self.assertEqual(PAYLOAD, data)

View File

@ -15,13 +15,13 @@ import shutil
@testcase.eden_repo_test
class UnlinkTest(testcase.EdenRepoTest):
def populate_repo(self):
def populate_repo(self) -> None:
self.repo.write_file('hello', 'hola\n')
self.repo.write_file('adir/file', 'foo!\n')
self.repo.symlink('slink', 'hello')
self.repo.commit('Initial commit.')
def test_unlink(self):
def test_unlink(self) -> None:
filename = os.path.join(self.mount, 'hello')
# This file is part of the git repo
@ -36,20 +36,20 @@ class UnlinkTest(testcase.EdenRepoTest):
self.assertEqual(context.exception.errno, errno.ENOENT,
msg='lstat on a removed file raises ENOENT')
def test_unlink_bogus_file(self):
def test_unlink_bogus_file(self) -> None:
with self.assertRaises(OSError) as context:
os.unlink(os.path.join(self.mount, 'this-file-does-not-exist'))
self.assertEqual(context.exception.errno, errno.ENOENT,
msg='unlink raises ENOENT for nonexistent file')
def test_unlink_dir(self):
def test_unlink_dir(self) -> None:
adir = os.path.join(self.mount, 'adir')
with self.assertRaises(OSError) as context:
os.unlink(adir)
self.assertEqual(context.exception.errno, errno.EISDIR,
msg='unlink on a dir raises EISDIR')
def test_unlink_empty_dir(self):
def test_unlink_empty_dir(self) -> None:
adir = os.path.join(self.mount, 'an-empty-dir')
os.mkdir(adir)
with self.assertRaises(OSError) as context:
@ -57,7 +57,7 @@ class UnlinkTest(testcase.EdenRepoTest):
self.assertEqual(context.exception.errno, errno.EISDIR,
msg='unlink on an empty dir raises EISDIR')
def test_rmdir_file(self):
def test_rmdir_file(self) -> None:
filename = os.path.join(self.mount, 'hello')
with self.assertRaises(OSError) as context:
@ -65,7 +65,7 @@ class UnlinkTest(testcase.EdenRepoTest):
self.assertEqual(context.exception.errno, errno.ENOTDIR,
msg='rmdir on a file raises ENOTDIR')
def test_rmdir(self):
def test_rmdir(self) -> None:
adir = os.path.join(self.mount, 'adir')
with self.assertRaises(OSError) as context:
os.rmdir(adir)
@ -78,7 +78,7 @@ class UnlinkTest(testcase.EdenRepoTest):
self.assertEqual(context.exception.errno, errno.ENOENT,
msg='lstat on a removed dir raises ENOENT')
def test_rmdir_overlay(self):
def test_rmdir_overlay(self) -> None:
# Ensure that removing dirs works with things we make in the overlay
deep_root = os.path.join(self.mount, 'buck-out')
deep_name = os.path.join(deep_root, 'foo', 'bar', 'baz')

View File

@ -13,17 +13,17 @@ import hashlib
import os
def sha1(value):
def sha1(value: bytes) -> str:
return hashlib.sha1(value).hexdigest()
@testcase.eden_repo_test
class XattrTest(testcase.EdenRepoTest):
def populate_repo(self):
def populate_repo(self) -> None:
self.repo.write_file('hello', 'hola\n')
self.repo.commit('Initial commit.')
def test_get_sha1_xattr(self):
def test_get_sha1_xattr(self) -> None:
filename = os.path.join(self.mount, 'hello')
xattr = fs.getxattr(filename, 'user.sha1')
contents = open(filename, 'rb').read()
@ -47,7 +47,7 @@ class XattrTest(testcase.EdenRepoTest):
self.assertEqual(sha1(b'foobarbaz'),
fs.getxattr(filename, 'user.sha1'))
def test_listxattr(self):
def test_listxattr(self) -> None:
filename = os.path.join(self.mount, 'hello')
xattrs = fs.listxattr(filename)
contents = open(filename, 'rb').read()