add python type information to more integration test code

Summary: Annotate more integration test functions with type information.

Reviewed By: bolinfest

Differential Revision: D6434358

fbshipit-source-id: b88351eebee58561465752378c6771b7b1f9554e
This commit is contained in:
Adam Simpkins 2017-11-29 14:23:57 -08:00 committed by Facebook Github Bot
parent 71981cc504
commit a3aa8d11e7
5 changed files with 60 additions and 49 deletions

View File

@ -15,11 +15,10 @@ import json
import os
def _find_post_clone():
post_clone = os.environ.get('EDENFS_POST_CLONE_PATH')
if not post_clone:
post_clone = os.path.join(find_executables.BUCK_OUT,
'gen/eden/hooks/hg/post-clone.par')
def _find_post_clone() -> str:
post_clone = (os.environ.get('EDENFS_POST_CLONE_PATH') or
os.path.join(find_executables.BUCK_OUT,
'gen/eden/hooks/hg/post-clone.par'))
if not os.access(post_clone, os.X_OK):
msg = ('unable to find post-clone script for integration testing: {!r}'
.format(post_clone))
@ -27,7 +26,7 @@ def _find_post_clone():
return post_clone
def _eden_ext_dir():
def _eden_ext_dir() -> str:
check_locations = [
# In dev mode, the python_binary link-tree can be found here:
'buck-out/gen/eden/hg/eden/eden#link-tree',

View File

@ -8,20 +8,24 @@
# of patent rights can be found in the PATENTS file in the same directory.
import os
from ..lib import hgrepo
from .lib.hg_extension_test_base import EdenHgTestCase, hg_test
from eden.integration.hg.lib.hg_extension_test_base import (
EdenHgTestCase,
hg_test
)
from eden.integration.lib import hgrepo
from textwrap import dedent
from typing import Dict
@hg_test
class UpdateTest(EdenHgTestCase):
def edenfs_logging_settings(self):
def edenfs_logging_settings(self) -> Dict[str, str]:
return {
'eden.fs.inodes.TreeInode': 'DBG5',
'eden.fs.inodes.CheckoutAction': 'DBG5',
}
def populate_backing_repo(self, repo):
def populate_backing_repo(self, repo: hgrepo.HgRepository) -> None:
repo.write_file('hello.txt', 'hola')
repo.write_file('.gitignore', 'ignoreme\n')
repo.write_file('foo/.gitignore', '*.log\n')
@ -35,7 +39,7 @@ class UpdateTest(EdenHgTestCase):
repo.write_file('foo/bar.txt', 'updated in commit 3\n')
self.commit3 = repo.commit('Update foo/.gitignore')
def test_update_clean_reverts_modified_files(self):
def test_update_clean_reverts_modified_files(self) -> None:
'''Test using `hg update --clean .` to revert file modifications.'''
self.assert_status_empty()
@ -46,7 +50,7 @@ class UpdateTest(EdenHgTestCase):
self.assertEqual('hola', self.read_file('hello.txt'))
self.assert_status_empty()
def test_update_clean_removes_added_and_removed_statuses(self):
def test_update_clean_removes_added_and_removed_statuses(self) -> None:
'''Test using `hg update --clean .` in the presence of added and removed
files.'''
self.write_file('bar/some_new_file.txt', 'new file\n')
@ -60,7 +64,7 @@ class UpdateTest(EdenHgTestCase):
self.assertTrue(os.path.isfile(self.get_path('foo/bar.txt')))
self.assert_dirstate_empty()
def test_update_with_gitignores(self):
def test_update_with_gitignores(self) -> None:
'''
Test `hg update` with gitignore files.
@ -104,7 +108,7 @@ class UpdateTest(EdenHgTestCase):
self.assertEqual('*.log\n', self.read_file('foo/.gitignore'))
self.assertEqual('test\n', self.read_file('foo/bar.txt'))
def test_update_with_new_commits(self):
def test_update_with_new_commits(self) -> None:
'''
Test running `hg update` to check out commits that were created after
the edenfs daemon originally started.
@ -123,7 +127,7 @@ class UpdateTest(EdenHgTestCase):
self.assertEqual(new_contents, self.read_file('foo/bar.txt'))
self.assert_status_empty()
def test_reset(self):
def test_reset(self) -> None:
'''
Test `hg reset`
'''
@ -138,7 +142,7 @@ class UpdateTest(EdenHgTestCase):
self.assert_status_empty()
self.assertEqual('test\n', self.read_file('foo/bar.txt'))
def test_update_replace_untracked_dir(self):
def test_update_replace_untracked_dir(self) -> None:
'''
Create a local untracked directory, then run "hg update -C" to
checkout a commit where this directory exists in source control.
@ -175,7 +179,7 @@ class UpdateTest(EdenHgTestCase):
'new_project/newcode.o': 'I',
})
def test_update_with_merge_flag_and_conflict(self):
def test_update_with_merge_flag_and_conflict(self) -> None:
self.write_file('foo/bar.txt', 'changing yet again\n')
with self.assertRaises(hgrepo.HgError) as context:
self.hg('update', '.^', '--merge')
@ -197,15 +201,19 @@ class UpdateTest(EdenHgTestCase):
)
self.assertEqual(expected_contents, self.read_file('foo/bar.txt'))
def test_update_with_added_file_that_is_tracked_in_destination(self):
def test_update_with_added_file_that_is_tracked_in_destination(
self
) -> None:
self._test_update_with_local_file_that_is_tracked_in_destination(True)
def test_update_with_untracked_file_that_is_tracked_in_destination(self):
def test_update_with_untracked_file_that_is_tracked_in_destination(
self
) -> None:
self._test_update_with_local_file_that_is_tracked_in_destination(False)
def _test_update_with_local_file_that_is_tracked_in_destination(
self, add_before_updating: bool
):
) -> None:
base_commit = self.repo.get_head_hash()
original_contents = 'Original contents.\n'
self.write_file('some_new_file.txt', original_contents)
@ -216,7 +224,7 @@ class UpdateTest(EdenHgTestCase):
# Do an `hg prev` and re-create the new file with different contents.
self.repo.update(base_commit)
self.assert_status_empty()
self.assertFalse(os.path.exists('some_new_file.txt'))
self.assertFalse(os.path.exists(self.get_path('some_new_file.txt')))
modified_contents = 'Re-create the file with different contents.\n'
self.write_file('some_new_file.txt', modified_contents)
@ -244,7 +252,7 @@ class UpdateTest(EdenHgTestCase):
self.assertTrue(os.path.isfile(expected_backup_file))
self.assertEqual(modified_contents, self.read_file(path_to_backup))
def test_update_modified_file_to_removed_file_taking_other(self):
def test_update_modified_file_to_removed_file_taking_other(self) -> None:
self.write_file('some_new_file.txt', 'I am new!\n')
self.hg('add', 'some_new_file.txt')
self.repo.commit('Commit a new file.')
@ -262,7 +270,7 @@ class UpdateTest(EdenHgTestCase):
':other was specified explicitly.'
)
def test_update_modified_file_to_removed_file_taking_local(self):
def test_update_modified_file_to_removed_file_taking_local(self) -> None:
self.write_file('some_new_file.txt', 'I am new!\n')
self.hg('add', 'some_new_file.txt')
self.repo.commit('Commit a new file.')
@ -275,7 +283,7 @@ class UpdateTest(EdenHgTestCase):
self.assertEqual(new_contents, self.read_file('some_new_file.txt'))
self.assert_status({'some_new_file.txt': 'A'})
def test_update_ignores_untracked_directory(self):
def test_update_ignores_untracked_directory(self) -> None:
head = self.repo.log()[-1]
self.mkdir('foo/bar')
self.write_file('foo/bar/a.txt', 'File in directory two levels deep.\n')

View File

@ -14,7 +14,7 @@ import subprocess
import sys
import tempfile
import time
from typing import Optional, Union
from typing import Optional
import eden.thrift
from fb303.ttypes import fb_status
@ -61,7 +61,7 @@ class EdenFS(object):
self.shutdown()
def _wait_for_healthy(
self, timeout: Union[int, float], exclude_pid: Optional[int]=None
self, timeout: float, exclude_pid: Optional[int]=None
):
'''Wait for edenfs to start and report that it is healthy.
@ -154,7 +154,7 @@ class EdenFS(object):
return cmd
def start(
self, timeout: Union[int, float]=30, takeover_from: Optional[int]=None
self, timeout: float=30, takeover_from: Optional[int]=None
):
'''
Run "eden daemon" to start the eden daemon.

View File

@ -7,6 +7,7 @@
# LICENSE file in the root directory of this source tree. An additional grant
# of patent rights can be found in the PATENTS file in the same directory.
import configparser
import datetime
import distutils.spawn
import os
@ -27,7 +28,7 @@ class HgError(subprocess.CalledProcessError):
super().__init__(orig.returncode, orig.cmd,
output=orig.output, stderr=orig.stderr)
def __str__(self):
def __str__(self) -> str:
if not self.stderr:
return super().__str__()
@ -47,7 +48,7 @@ class HgError(subprocess.CalledProcessError):
class HgRepository(repobase.Repository):
def __init__(self, path):
def __init__(self, path: str) -> None:
'''
If hgrc is specified, it will be used as the value of the HGRCPATH
environment variable when `hg` is run.
@ -98,7 +99,7 @@ class HgRepository(repobase.Repository):
else:
return None
def init(self, hgrc=None):
def init(self, hgrc: configparser.ConfigParser = None) -> None:
'''
Initialize a new hg repository by running 'hg init'
@ -108,17 +109,20 @@ class HgRepository(repobase.Repository):
'''
self.hg('init')
if hgrc is not None:
hgrc_path = os.path.join(self.path, '.hg', 'hgrc')
with open(hgrc_path, 'a') as f:
hgrc.write(f)
self.write_hgrc(hgrc)
def get_type(self):
def write_hgrc(self, hgrc: configparser.ConfigParser) -> None:
hgrc_path = os.path.join(self.path, '.hg', 'hgrc')
with open(hgrc_path, 'a') as f:
hgrc.write(f)
def get_type(self) -> str:
return 'hg'
def get_head_hash(self):
def get_head_hash(self) -> str:
return self.hg('log', '-r.', '-T{node}')
def get_canonical_root(self):
def get_canonical_root(self) -> str:
return self.path
def add_files(self, paths: List[str]) -> None:
@ -173,7 +177,7 @@ class HgRepository(repobase.Repository):
# Get the commit ID and return it
return self.hg('log', '-T{node}', '-r.')
def log(self, template='{node}', revset='::.'):
def log(self, template: str = '{node}', revset: str = '::.') -> List[str]:
'''Runs `hg log` with the specified template and revset.
Returns the log output, as a list with one entry per commit.'''
@ -188,18 +192,18 @@ class HgRepository(repobase.Repository):
output = self.hg('log', '-T', template, '-r', revset)
return output.split(delimiter)[:-1]
def status(self):
def status(self) -> str:
'''Returns the output of `hg status` as a string.'''
return self.hg('status')
def update(self, rev, clean=False):
def update(self, rev: str, clean: bool = False) -> None:
if clean:
args = ['update', '--clean', rev]
else:
args = ['update', rev]
self.hg(*args, stdout=None, stderr=None)
def reset(self, rev, keep=True):
def reset(self, rev: str, keep: bool = True) -> None:
if keep:
args = ['reset', '--keep', rev]
else:

View File

@ -15,6 +15,7 @@ import os
import shutil
import tempfile
import time
import typing
import unittest
from hypothesis import settings, HealthCheck
import hypothesis.strategies as st
@ -70,7 +71,7 @@ if is_sandcastle() and not edenclient.can_run_eden():
# This is avoiding a reporting noise issue in our CI that files
# tasks about skipped tests. Let's just skip defining most of them
# to avoid the noise if we know that they won't work anyway.
TestParent = object
TestParent = typing.cast(unittest.TestCase, object)
else:
TestParent = unittest.TestCase
@ -126,7 +127,6 @@ class EdenTestCase(TestParent):
self.start = time.time()
self.last_event = self.start
self.tmp_dir = None
self.eden = None
self.old_home = None
@ -240,17 +240,17 @@ class EdenTestCase(TestParent):
return repo
def get_path(self, path):
def get_path(self, path: str) -> str:
'''Resolves the path against self.mount.'''
return os.path.join(self.mount, path)
def touch(self, path):
def touch(self, path: str) -> None:
'''Touch the file at the specified path relative to the clone.'''
fullpath = self.get_path(path)
with open(fullpath, 'a'):
os.utime(fullpath)
def write_file(self, path, contents, mode=0o644):
def write_file(self, path: str, contents: str, mode: int = 0o644) -> None:
'''Create or overwrite a file with the given contents.'''
fullpath = self.get_path(path)
self.make_parent_dir(fullpath)
@ -258,7 +258,7 @@ class EdenTestCase(TestParent):
f.write(contents)
os.chmod(fullpath, mode)
def read_file(self, path):
def read_file(self, path: str) -> str:
'''Read the file with the specified path inside the eden repository,
and return its contents.
'''
@ -266,7 +266,7 @@ class EdenTestCase(TestParent):
with open(fullpath, 'r') as f:
return f.read()
def mkdir(self, path):
def mkdir(self, path: str) -> None:
'''Call mkdir for the specified path relative to the clone.'''
full_path = self.get_path(path)
try:
@ -275,12 +275,12 @@ class EdenTestCase(TestParent):
if ex.errno != errno.EEXIST:
raise
def make_parent_dir(self, path):
def make_parent_dir(self, path: str) -> None:
dirname = os.path.dirname(path)
if dirname:
self.mkdir(dirname)
def rm(self, path):
def rm(self, path: str) -> None:
'''Unlink the file at the specified path relative to the clone.'''
os.unlink(self.get_path(path))