mirror of
https://github.com/facebook/sapling.git
synced 2024-10-04 22:07:44 +03:00
getdeps: add an add_fb_python_unittest()
function
Summary: Add a function for defining Python unit tests. This creates the test executable, and also emits logic to perform test discovery for ctest. Reviewed By: simpkins Differential Revision: D17610034 fbshipit-source-id: cdf15b0b04acc1d3e906a1e2a95eb327951176ba
This commit is contained in:
parent
76de96dacf
commit
5293f4631b
@ -62,6 +62,11 @@ find_program(
|
||||
FB_MAKE_PYTHON_ARCHIVE "make_fbpy_archive.py"
|
||||
PATHS ${CMAKE_MODULE_PATH}
|
||||
)
|
||||
set(FB_PY_TEST_MAIN "${CMAKE_CURRENT_LIST_DIR}/fb_py_test_main.py")
|
||||
set(
|
||||
FB_PY_TEST_DISCOVER_SCRIPT
|
||||
"${CMAKE_CURRENT_LIST_DIR}/FBPythonTestAddTests.cmake"
|
||||
)
|
||||
|
||||
# An option to control the default installation location for
|
||||
# install_fb_python_library(). This is relative to ${CMAKE_INSTALL_PREFIX}
|
||||
@ -164,6 +169,119 @@ function(add_fb_python_executable EXE_NAME)
|
||||
PROPERTY EXECUTABLE "${CMAKE_CURRENT_BINARY_DIR}/${output_file}")
|
||||
endfunction()
|
||||
|
||||
# Define a python unittest executable.
|
||||
# The executable is built using add_fb_python_executable and has the
|
||||
# following differences:
|
||||
#
|
||||
# Each of the source files specified in SOURCES will be imported
|
||||
# and have unittest discovery performed upon them.
|
||||
# Those sources will be imported in the top level namespace.
|
||||
#
|
||||
# The ENV argument allows specifying a list of "KEY=VALUE"
|
||||
# pairs that will be used by the test runner to set up the environment
|
||||
# in the child process prior to running the test. This is useful for
|
||||
# passing additional configuration to the test.
|
||||
function(add_fb_python_unittest TARGET)
|
||||
# Parse the arguments
|
||||
set(multi_value_args SOURCES DEPENDS ENV PROPERTIES)
|
||||
set(
|
||||
one_value_args
|
||||
WORKING_DIRECTORY BASE_DIR NAMESPACE TEST_LIST DISCOVERY_TIMEOUT
|
||||
)
|
||||
fb_cmake_parse_args(
|
||||
ARG "" "${one_value_args}" "${multi_value_args}" "${ARGN}"
|
||||
)
|
||||
fb_py_process_default_args(ARG_NAMESPACE ARG_BASE_DIR)
|
||||
if(NOT ARG_WORKING_DIRECTORY)
|
||||
# Default the working directory to the current binary directory.
|
||||
# This matches the default behavior of add_test() and other standard
|
||||
# test functions like gtest_discover_tests()
|
||||
set(ARG_WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}")
|
||||
endif()
|
||||
if(NOT ARG_TEST_LIST)
|
||||
set(ARG_TEST_LIST "${TARGET}_TESTS")
|
||||
endif()
|
||||
if(NOT ARG_DISCOVERY_TIMEOUT)
|
||||
set(ARG_DISCOVERY_TIMEOUT 5)
|
||||
endif()
|
||||
|
||||
# Tell our test program the list of modules to scan for tests.
|
||||
# We scan all modules directly listed in our SOURCES argument, and skip
|
||||
# modules that came from dependencies in the DEPENDS list.
|
||||
#
|
||||
# This is written into a __test_modules__.py module that the test runner
|
||||
# will look at.
|
||||
set(
|
||||
test_modules_path
|
||||
"${CMAKE_CURRENT_BINARY_DIR}/${TARGET}_test_modules.py"
|
||||
)
|
||||
file(WRITE "${test_modules_path}" "TEST_MODULES = [\n")
|
||||
string(REPLACE "." "/" namespace_dir "${ARG_NAMESPACE}")
|
||||
if (NOT "${namespace_dir}" STREQUAL "")
|
||||
set(namespace_dir "${namespace_dir}/")
|
||||
endif()
|
||||
set(test_modules)
|
||||
foreach(src_path IN LISTS ARG_SOURCES)
|
||||
fb_py_compute_dest_path(
|
||||
abs_source dest_path
|
||||
"${src_path}" "${namespace_dir}" "${ARG_BASE_DIR}"
|
||||
)
|
||||
string(REPLACE "/" "." module_name "${dest_path}")
|
||||
string(REGEX REPLACE "\\.py$" "" module_name "${module_name}")
|
||||
list(APPEND test_modules "${module_name}")
|
||||
file(APPEND "${test_modules_path}" " '${module_name}',\n")
|
||||
endforeach()
|
||||
file(APPEND "${test_modules_path}" "]\n")
|
||||
|
||||
# The __main__ is provided by our runner wrapper/bootstrap
|
||||
list(APPEND ARG_SOURCES "${FB_PY_TEST_MAIN}=__main__.py")
|
||||
list(APPEND ARG_SOURCES "${test_modules_path}=__test_modules__.py")
|
||||
|
||||
add_fb_python_executable(
|
||||
"${TARGET}"
|
||||
NAMESPACE "${ARG_NAMESPACE}"
|
||||
BASE_DIR "${ARG_BASE_DIR}"
|
||||
SOURCES ${ARG_SOURCES}
|
||||
DEPENDS ${ARG_DEPENDS}
|
||||
)
|
||||
|
||||
# Run test discovery after the test executable is built.
|
||||
# This logic is based on the code for gtest_discover_tests()
|
||||
set(ctest_file_base "${CMAKE_CURRENT_BINARY_DIR}/${TARGET}")
|
||||
set(ctest_include_file "${ctest_file_base}_include.cmake")
|
||||
set(ctest_tests_file "${ctest_file_base}_tests.cmake")
|
||||
add_custom_command(
|
||||
TARGET "${TARGET}.GEN_PY_EXE" POST_BUILD
|
||||
BYPRODUCTS "${ctest_tests_file}"
|
||||
COMMAND
|
||||
"${CMAKE_COMMAND}"
|
||||
-D "TEST_TARGET=${TARGET}"
|
||||
-D "TEST_INTERPRETER=${Python3_EXECUTABLE}"
|
||||
-D "TEST_ENV=${ARG_ENV}"
|
||||
-D "TEST_EXECUTABLE=$<TARGET_PROPERTY:${TARGET}.GEN_PY_EXE,EXECUTABLE>"
|
||||
-D "TEST_WORKING_DIR=${ARG_WORKING_DIRECTORY}"
|
||||
-D "TEST_LIST=${ARG_TEST_LIST}"
|
||||
-D "TEST_PREFIX=${TARGET}::"
|
||||
-D "TEST_PROPERTIES=${ARG_PROPERTIES}"
|
||||
-D "CTEST_FILE=${ctest_tests_file}"
|
||||
-P "${FB_PY_TEST_DISCOVER_SCRIPT}"
|
||||
VERBATIM
|
||||
)
|
||||
|
||||
file(
|
||||
WRITE "${ctest_include_file}"
|
||||
"if(EXISTS \"${ctest_tests_file}\")\n"
|
||||
" include(\"${ctest_tests_file}\")\n"
|
||||
"else()\n"
|
||||
" add_test(\"${TARGET}_NOT_BUILT\" \"${TARGET}_NOT_BUILT\")\n"
|
||||
"endif()\n"
|
||||
)
|
||||
set_property(
|
||||
DIRECTORY APPEND PROPERTY TEST_INCLUDE_FILES
|
||||
"${ctest_include_file}"
|
||||
)
|
||||
endfunction()
|
||||
|
||||
#
|
||||
# Define a python library.
|
||||
#
|
||||
@ -278,35 +396,11 @@ function(add_fb_python_library LIB_NAME)
|
||||
file(WRITE "${tmp_manifest}" "FBPY_MANIFEST 1\n")
|
||||
set(abs_sources)
|
||||
foreach(src_path IN LISTS ARG_SOURCES)
|
||||
if("${src_path}" MATCHES "=")
|
||||
# We want to split the string on the `=` sign, but cmake doesn't
|
||||
# provide much in the way of helpers for this, so we rewrite the
|
||||
# `=` sign to `;` so that we can treat it as a cmake list and
|
||||
# then index into the components
|
||||
string(REPLACE "=" ";" src_path_list "${src_path}")
|
||||
list(GET src_path_list 0 src_path)
|
||||
# Note that we ignore the `namespace_dir` in the alias case
|
||||
# in order to allow aliasing a source to the top level `__main__.py`
|
||||
# filename.
|
||||
list(GET src_path_list 1 dest_path)
|
||||
else()
|
||||
unset(dest_path)
|
||||
endif()
|
||||
|
||||
get_filename_component(abs_source "${src_path}" ABSOLUTE)
|
||||
fb_py_compute_dest_path(
|
||||
abs_source dest_path
|
||||
"${src_path}" "${namespace_dir}" "${ARG_BASE_DIR}"
|
||||
)
|
||||
list(APPEND abs_sources "${abs_source}")
|
||||
file(RELATIVE_PATH rel_src "${ARG_BASE_DIR}" "${abs_source}")
|
||||
|
||||
if(NOT DEFINED dest_path)
|
||||
if("${rel_src}" MATCHES "^../")
|
||||
message(
|
||||
FATAL_ERROR "${LIB_NAME}: source file \"${abs_source}\" is not inside "
|
||||
"the base directory ${ARG_BASE_DIR}"
|
||||
)
|
||||
endif()
|
||||
set(dest_path "${namespace_dir}${rel_src}")
|
||||
endif()
|
||||
|
||||
target_sources(
|
||||
"${LIB_NAME}.py_lib" INTERFACE
|
||||
"$<BUILD_INTERFACE:${abs_source}>"
|
||||
@ -489,7 +583,42 @@ function(fb_py_check_available)
|
||||
if (NOT FB_MAKE_PYTHON_ARCHIVE)
|
||||
message(
|
||||
FATAL_ERROR "unable to find make_fbpy_archive.py helper program (it "
|
||||
"should be located in the same directory as FBPythonRules.cmake)"
|
||||
"should be located in the same directory as FBPythonBinary.cmake)"
|
||||
)
|
||||
endif()
|
||||
endfunction()
|
||||
|
||||
function(
|
||||
fb_py_compute_dest_path
|
||||
src_path_output dest_path_output src_path namespace_dir base_dir
|
||||
)
|
||||
if("${src_path}" MATCHES "=")
|
||||
# We want to split the string on the `=` sign, but cmake doesn't
|
||||
# provide much in the way of helpers for this, so we rewrite the
|
||||
# `=` sign to `;` so that we can treat it as a cmake list and
|
||||
# then index into the components
|
||||
string(REPLACE "=" ";" src_path_list "${src_path}")
|
||||
list(GET src_path_list 0 src_path)
|
||||
# Note that we ignore the `namespace_dir` in the alias case
|
||||
# in order to allow aliasing a source to the top level `__main__.py`
|
||||
# filename.
|
||||
list(GET src_path_list 1 dest_path)
|
||||
else()
|
||||
unset(dest_path)
|
||||
endif()
|
||||
|
||||
get_filename_component(abs_source "${src_path}" ABSOLUTE)
|
||||
if(NOT DEFINED dest_path)
|
||||
file(RELATIVE_PATH rel_src "${ARG_BASE_DIR}" "${abs_source}")
|
||||
if("${rel_src}" MATCHES "^../")
|
||||
message(
|
||||
FATAL_ERROR "${LIB_NAME}: source file \"${abs_source}\" is not inside "
|
||||
"the base directory ${ARG_BASE_DIR}"
|
||||
)
|
||||
endif()
|
||||
set(dest_path "${namespace_dir}${rel_src}")
|
||||
endif()
|
||||
|
||||
set("${src_path_output}" "${abs_source}" PARENT_SCOPE)
|
||||
set("${dest_path_output}" "${dest_path}" PARENT_SCOPE)
|
||||
endfunction()
|
||||
|
59
build/fbcode_builder/CMake/FBPythonTestAddTests.cmake
Normal file
59
build/fbcode_builder/CMake/FBPythonTestAddTests.cmake
Normal file
@ -0,0 +1,59 @@
|
||||
# Copyright (c) Facebook, Inc. and its affiliates.
|
||||
|
||||
# Add a command to be emitted to the CTest file
|
||||
set(ctest_script)
|
||||
function(add_command CMD)
|
||||
set(escaped_args "")
|
||||
foreach(arg ${ARGN})
|
||||
# Escape all arguments using "Bracket Argument" syntax
|
||||
# We could skip this for argument that don't contain any special
|
||||
# characters if we wanted to make the output slightly more human-friendly.
|
||||
set(escaped_args "${escaped_args} [==[${arg}]==]")
|
||||
endforeach()
|
||||
set(ctest_script "${ctest_script}${CMD}(${escaped_args})\n" PARENT_SCOPE)
|
||||
endfunction()
|
||||
|
||||
if(NOT EXISTS "${TEST_EXECUTABLE}")
|
||||
message(FATAL_ERROR "Test executable does not exist: ${TEST_EXECUTABLE}")
|
||||
endif()
|
||||
execute_process(
|
||||
COMMAND ${CMAKE_COMMAND} -E env ${TEST_ENV} "${TEST_INTERPRETER}" "${TEST_EXECUTABLE}" --list-tests
|
||||
WORKING_DIRECTORY "${TEST_WORKING_DIR}"
|
||||
OUTPUT_VARIABLE output
|
||||
RESULT_VARIABLE result
|
||||
)
|
||||
if(NOT "${result}" EQUAL 0)
|
||||
string(REPLACE "\n" "\n " output "${output}")
|
||||
message(
|
||||
FATAL_ERROR
|
||||
"Error running test executable: ${TEST_EXECUTABLE}\n"
|
||||
"Output:\n"
|
||||
" ${output}\n"
|
||||
)
|
||||
endif()
|
||||
|
||||
# Parse output
|
||||
string(REPLACE "\n" ";" tests_list "${output}")
|
||||
foreach(test_name ${tests_list})
|
||||
add_command(
|
||||
add_test
|
||||
"${TEST_PREFIX}${test_name}"
|
||||
${CMAKE_COMMAND} -E env ${TEST_ENV}
|
||||
"${TEST_INTERPRETER}" "${TEST_EXECUTABLE}" "${test_name}"
|
||||
)
|
||||
add_command(
|
||||
set_tests_properties
|
||||
"${TEST_PREFIX}${test_name}"
|
||||
PROPERTIES
|
||||
WORKING_DIRECTORY "${TEST_WORKING_DIR}"
|
||||
${TEST_PROPERTIES}
|
||||
)
|
||||
endforeach()
|
||||
|
||||
# Set a list of discovered tests in the parent scope, in case users
|
||||
# want access to this list as a CMake variable
|
||||
if(TEST_LIST)
|
||||
add_command(set ${TEST_LIST} ${tests_list})
|
||||
endif()
|
||||
|
||||
file(WRITE "${CTEST_FILE}" "${ctest_script}")
|
820
build/fbcode_builder/CMake/fb_py_test_main.py
Normal file
820
build/fbcode_builder/CMake/fb_py_test_main.py
Normal file
@ -0,0 +1,820 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Copyright (c) Facebook, Inc. and its affiliates.
|
||||
#
|
||||
"""
|
||||
This file contains the main module code for Python test programs.
|
||||
"""
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
import contextlib
|
||||
import ctypes
|
||||
import fnmatch
|
||||
import json
|
||||
import logging
|
||||
import optparse
|
||||
import os
|
||||
import platform
|
||||
import re
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
import traceback
|
||||
import unittest
|
||||
import warnings
|
||||
|
||||
# Hide warning about importing "imp"; remove once python2 is gone.
|
||||
with warnings.catch_warnings():
|
||||
warnings.filterwarnings("ignore", category=DeprecationWarning)
|
||||
import imp
|
||||
|
||||
try:
|
||||
from StringIO import StringIO
|
||||
except ImportError:
|
||||
from io import StringIO
|
||||
try:
|
||||
import coverage
|
||||
except ImportError:
|
||||
coverage = None # type: ignore
|
||||
try:
|
||||
from importlib.machinery import SourceFileLoader
|
||||
except ImportError:
|
||||
SourceFileLoader = None # type: ignore
|
||||
|
||||
|
||||
class get_cpu_instr_counter(object):
|
||||
def read(self):
|
||||
# TODO
|
||||
return 0
|
||||
|
||||
|
||||
EXIT_CODE_SUCCESS = 0
|
||||
EXIT_CODE_TEST_FAILURE = 70
|
||||
|
||||
|
||||
class TestStatus(object):
|
||||
|
||||
ABORTED = "FAILURE"
|
||||
PASSED = "SUCCESS"
|
||||
FAILED = "FAILURE"
|
||||
EXPECTED_FAILURE = "SUCCESS"
|
||||
UNEXPECTED_SUCCESS = "FAILURE"
|
||||
SKIPPED = "ASSUMPTION_VIOLATION"
|
||||
|
||||
|
||||
class PathMatcher(object):
|
||||
def __init__(self, include_patterns, omit_patterns):
|
||||
self.include_patterns = include_patterns
|
||||
self.omit_patterns = omit_patterns
|
||||
|
||||
def omit(self, path):
|
||||
"""
|
||||
Omit iff matches any of the omit_patterns or the include patterns are
|
||||
not empty and none is matched
|
||||
"""
|
||||
path = os.path.realpath(path)
|
||||
return any(fnmatch.fnmatch(path, p) for p in self.omit_patterns) or (
|
||||
self.include_patterns
|
||||
and not any(fnmatch.fnmatch(path, p) for p in self.include_patterns)
|
||||
)
|
||||
|
||||
def include(self, path):
|
||||
return not self.omit(path)
|
||||
|
||||
|
||||
class DebugWipeFinder(object):
|
||||
"""
|
||||
PEP 302 finder that uses a DebugWipeLoader for all files which do not need
|
||||
coverage
|
||||
"""
|
||||
|
||||
def __init__(self, matcher):
|
||||
self.matcher = matcher
|
||||
|
||||
def find_module(self, fullname, path=None):
|
||||
_, _, basename = fullname.rpartition(".")
|
||||
try:
|
||||
fd, pypath, (_, _, kind) = imp.find_module(basename, path)
|
||||
except Exception:
|
||||
# Maybe it's a top level module
|
||||
try:
|
||||
fd, pypath, (_, _, kind) = imp.find_module(basename, None)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
if hasattr(fd, "close"):
|
||||
fd.close()
|
||||
if kind != imp.PY_SOURCE:
|
||||
return None
|
||||
if self.matcher.include(pypath):
|
||||
return None
|
||||
|
||||
"""
|
||||
This is defined to match CPython's PyVarObject struct
|
||||
"""
|
||||
|
||||
class PyVarObject(ctypes.Structure):
|
||||
_fields_ = [
|
||||
("ob_refcnt", ctypes.c_long),
|
||||
("ob_type", ctypes.c_void_p),
|
||||
("ob_size", ctypes.c_ulong),
|
||||
]
|
||||
|
||||
class DebugWipeLoader(SourceFileLoader):
|
||||
"""
|
||||
PEP302 loader that zeros out debug information before execution
|
||||
"""
|
||||
|
||||
def get_code(self, fullname):
|
||||
code = super(DebugWipeLoader, self).get_code(fullname)
|
||||
if code:
|
||||
# Ideally we'd do
|
||||
# code.co_lnotab = b''
|
||||
# But code objects are READONLY. Not to worry though; we'll
|
||||
# directly modify CPython's object
|
||||
code_impl = PyVarObject.from_address(id(code.co_lnotab))
|
||||
code_impl.ob_size = 0
|
||||
return code
|
||||
|
||||
return DebugWipeLoader(fullname, pypath)
|
||||
|
||||
|
||||
def optimize_for_coverage(cov, include_patterns, omit_patterns):
|
||||
"""
|
||||
We get better performance if we zero out debug information for files which
|
||||
we're not interested in. Only available in CPython 3.3+
|
||||
"""
|
||||
matcher = PathMatcher(include_patterns, omit_patterns)
|
||||
if SourceFileLoader and platform.python_implementation() == "CPython":
|
||||
sys.meta_path.insert(0, DebugWipeFinder(matcher))
|
||||
|
||||
|
||||
class TeeStream(object):
|
||||
def __init__(self, *streams):
|
||||
self._streams = streams
|
||||
|
||||
def write(self, data):
|
||||
for stream in self._streams:
|
||||
stream.write(data)
|
||||
|
||||
def flush(self):
|
||||
for stream in self._streams:
|
||||
stream.flush()
|
||||
|
||||
def isatty(self):
|
||||
return False
|
||||
|
||||
|
||||
class CallbackStream(object):
|
||||
def __init__(self, callback, bytes_callback=None, orig=None):
|
||||
self._callback = callback
|
||||
self._fileno = orig.fileno() if orig else None
|
||||
|
||||
# Python 3 APIs:
|
||||
# - `encoding` is a string holding the encoding name
|
||||
# - `errors` is a string holding the error-handling mode for encoding
|
||||
# - `buffer` should look like an io.BufferedIOBase object
|
||||
|
||||
self.errors = orig.errors if orig else None
|
||||
if bytes_callback:
|
||||
# those members are only on the io.TextIOWrapper
|
||||
self.encoding = orig.encoding if orig else "UTF-8"
|
||||
self.buffer = CallbackStream(bytes_callback, orig=orig)
|
||||
|
||||
def write(self, data):
|
||||
self._callback(data)
|
||||
|
||||
def flush(self):
|
||||
pass
|
||||
|
||||
def isatty(self):
|
||||
return False
|
||||
|
||||
def fileno(self):
|
||||
return self._fileno
|
||||
|
||||
|
||||
class BuckTestResult(unittest._TextTestResult):
|
||||
"""
|
||||
Our own TestResult class that outputs data in a format that can be easily
|
||||
parsed by buck's test runner.
|
||||
"""
|
||||
|
||||
_instr_counter = get_cpu_instr_counter()
|
||||
|
||||
def __init__(
|
||||
self, stream, descriptions, verbosity, show_output, main_program, suite
|
||||
):
|
||||
super(BuckTestResult, self).__init__(stream, descriptions, verbosity)
|
||||
self._main_program = main_program
|
||||
self._suite = suite
|
||||
self._results = []
|
||||
self._current_test = None
|
||||
self._saved_stdout = sys.stdout
|
||||
self._saved_stderr = sys.stderr
|
||||
self._show_output = show_output
|
||||
|
||||
def getResults(self):
|
||||
return self._results
|
||||
|
||||
def startTest(self, test):
|
||||
super(BuckTestResult, self).startTest(test)
|
||||
|
||||
# Pass in the real stdout and stderr filenos. We can't really do much
|
||||
# here to intercept callers who directly operate on these fileno
|
||||
# objects.
|
||||
sys.stdout = CallbackStream(
|
||||
self.addStdout, self.addStdoutBytes, orig=sys.stdout
|
||||
)
|
||||
sys.stderr = CallbackStream(
|
||||
self.addStderr, self.addStderrBytes, orig=sys.stderr
|
||||
)
|
||||
self._current_test = test
|
||||
self._test_start_time = time.time()
|
||||
self._current_status = TestStatus.ABORTED
|
||||
self._messages = []
|
||||
self._stacktrace = None
|
||||
self._stdout = ""
|
||||
self._stderr = ""
|
||||
self._start_instr_count = self._instr_counter.read()
|
||||
|
||||
def _find_next_test(self, suite):
|
||||
"""
|
||||
Find the next test that has not been run.
|
||||
"""
|
||||
|
||||
for test in suite:
|
||||
|
||||
# We identify test suites by test that are iterable (as is done in
|
||||
# the builtin python test harness). If we see one, recurse on it.
|
||||
if hasattr(test, "__iter__"):
|
||||
test = self._find_next_test(test)
|
||||
|
||||
# The builtin python test harness sets test references to `None`
|
||||
# after they have run, so we know we've found the next test up
|
||||
# if it's not `None`.
|
||||
if test is not None:
|
||||
return test
|
||||
|
||||
def stopTest(self, test):
|
||||
sys.stdout = self._saved_stdout
|
||||
sys.stderr = self._saved_stderr
|
||||
|
||||
super(BuckTestResult, self).stopTest(test)
|
||||
|
||||
# If a failure occured during module/class setup, then this "test" may
|
||||
# actually be a `_ErrorHolder`, which doesn't contain explicit info
|
||||
# about the upcoming test. Since we really only care about the test
|
||||
# name field (i.e. `_testMethodName`), we use that to detect an actual
|
||||
# test cases, and fall back to looking the test up from the suite
|
||||
# otherwise.
|
||||
if not hasattr(test, "_testMethodName"):
|
||||
test = self._find_next_test(self._suite)
|
||||
|
||||
result = {
|
||||
"testCaseName": "{0}.{1}".format(
|
||||
test.__class__.__module__, test.__class__.__name__
|
||||
),
|
||||
"testCase": test._testMethodName,
|
||||
"type": self._current_status,
|
||||
"time": int((time.time() - self._test_start_time) * 1000),
|
||||
"message": os.linesep.join(self._messages),
|
||||
"stacktrace": self._stacktrace,
|
||||
"stdOut": self._stdout,
|
||||
"stdErr": self._stderr,
|
||||
}
|
||||
|
||||
# TestPilot supports an instruction count field.
|
||||
if "TEST_PILOT" in os.environ:
|
||||
result["instrCount"] = (
|
||||
int(self._instr_counter.read() - self._start_instr_count),
|
||||
)
|
||||
|
||||
self._results.append(result)
|
||||
self._current_test = None
|
||||
|
||||
def stopTestRun(self):
|
||||
cov = self._main_program.get_coverage()
|
||||
if cov is not None:
|
||||
self._results.append({"coverage": cov})
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _withTest(self, test):
|
||||
self.startTest(test)
|
||||
yield
|
||||
self.stopTest(test)
|
||||
|
||||
def _setStatus(self, test, status, message=None, stacktrace=None):
|
||||
assert test == self._current_test
|
||||
self._current_status = status
|
||||
self._stacktrace = stacktrace
|
||||
if message is not None:
|
||||
if message.endswith(os.linesep):
|
||||
message = message[:-1]
|
||||
self._messages.append(message)
|
||||
|
||||
def setStatus(self, test, status, message=None, stacktrace=None):
|
||||
# addError() may be called outside of a test if one of the shared
|
||||
# fixtures (setUpClass/tearDownClass/setUpModule/tearDownModule)
|
||||
# throws an error.
|
||||
#
|
||||
# In this case, create a fake test result to record the error.
|
||||
if self._current_test is None:
|
||||
with self._withTest(test):
|
||||
self._setStatus(test, status, message, stacktrace)
|
||||
else:
|
||||
self._setStatus(test, status, message, stacktrace)
|
||||
|
||||
def setException(self, test, status, excinfo):
|
||||
exctype, value, tb = excinfo
|
||||
self.setStatus(
|
||||
test,
|
||||
status,
|
||||
"{0}: {1}".format(exctype.__name__, value),
|
||||
"".join(traceback.format_tb(tb)),
|
||||
)
|
||||
|
||||
def addSuccess(self, test):
|
||||
super(BuckTestResult, self).addSuccess(test)
|
||||
self.setStatus(test, TestStatus.PASSED)
|
||||
|
||||
def addError(self, test, err):
|
||||
super(BuckTestResult, self).addError(test, err)
|
||||
self.setException(test, TestStatus.ABORTED, err)
|
||||
|
||||
def addFailure(self, test, err):
|
||||
super(BuckTestResult, self).addFailure(test, err)
|
||||
self.setException(test, TestStatus.FAILED, err)
|
||||
|
||||
def addSkip(self, test, reason):
|
||||
super(BuckTestResult, self).addSkip(test, reason)
|
||||
self.setStatus(test, TestStatus.SKIPPED, "Skipped: %s" % (reason,))
|
||||
|
||||
def addExpectedFailure(self, test, err):
|
||||
super(BuckTestResult, self).addExpectedFailure(test, err)
|
||||
self.setException(test, TestStatus.EXPECTED_FAILURE, err)
|
||||
|
||||
def addUnexpectedSuccess(self, test):
|
||||
super(BuckTestResult, self).addUnexpectedSuccess(test)
|
||||
self.setStatus(test, TestStatus.UNEXPECTED_SUCCESS, "Unexpected success")
|
||||
|
||||
def addStdout(self, val):
|
||||
self._stdout += val
|
||||
if self._show_output:
|
||||
self._saved_stdout.write(val)
|
||||
self._saved_stdout.flush()
|
||||
|
||||
def addStdoutBytes(self, val):
|
||||
string = val.decode("utf-8", errors="backslashreplace")
|
||||
self.addStdout(string)
|
||||
|
||||
def addStderr(self, val):
|
||||
self._stderr += val
|
||||
if self._show_output:
|
||||
self._saved_stderr.write(val)
|
||||
self._saved_stderr.flush()
|
||||
|
||||
def addStderrBytes(self, val):
|
||||
string = val.decode("utf-8", errors="backslashreplace")
|
||||
self.addStderr(string)
|
||||
|
||||
|
||||
class BuckTestRunner(unittest.TextTestRunner):
|
||||
def __init__(self, main_program, suite, show_output=True, **kwargs):
|
||||
super(BuckTestRunner, self).__init__(**kwargs)
|
||||
self.show_output = show_output
|
||||
self._main_program = main_program
|
||||
self._suite = suite
|
||||
|
||||
def _makeResult(self):
|
||||
return BuckTestResult(
|
||||
self.stream,
|
||||
self.descriptions,
|
||||
self.verbosity,
|
||||
self.show_output,
|
||||
self._main_program,
|
||||
self._suite,
|
||||
)
|
||||
|
||||
|
||||
def _format_test_name(test_class, attrname):
|
||||
return "{0}.{1}.{2}".format(test_class.__module__, test_class.__name__, attrname)
|
||||
|
||||
|
||||
class StderrLogHandler(logging.StreamHandler):
|
||||
"""
|
||||
This class is very similar to logging.StreamHandler, except that it
|
||||
always uses the current sys.stderr object.
|
||||
|
||||
StreamHandler caches the current sys.stderr object when it is constructed.
|
||||
This makes it behave poorly in unit tests, which may replace sys.stderr
|
||||
with a StringIO buffer during tests. The StreamHandler will continue using
|
||||
the old sys.stderr object instead of the desired StringIO buffer.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
logging.Handler.__init__(self)
|
||||
|
||||
@property
|
||||
def stream(self):
|
||||
return sys.stderr
|
||||
|
||||
|
||||
class RegexTestLoader(unittest.TestLoader):
|
||||
def __init__(self, regex=None):
|
||||
self.regex = regex
|
||||
super(RegexTestLoader, self).__init__()
|
||||
|
||||
def getTestCaseNames(self, testCaseClass):
|
||||
"""
|
||||
Return a sorted sequence of method names found within testCaseClass
|
||||
"""
|
||||
|
||||
testFnNames = super(RegexTestLoader, self).getTestCaseNames(testCaseClass)
|
||||
if self.regex is None:
|
||||
return testFnNames
|
||||
robj = re.compile(self.regex)
|
||||
matched = []
|
||||
for attrname in testFnNames:
|
||||
fullname = _format_test_name(testCaseClass, attrname)
|
||||
if robj.search(fullname):
|
||||
matched.append(attrname)
|
||||
return matched
|
||||
|
||||
|
||||
class Loader(object):
|
||||
|
||||
suiteClass = unittest.TestSuite
|
||||
|
||||
def __init__(self, modules, regex=None):
|
||||
self.modules = modules
|
||||
self.regex = regex
|
||||
|
||||
def load_all(self):
|
||||
loader = RegexTestLoader(self.regex)
|
||||
test_suite = self.suiteClass()
|
||||
for module_name in self.modules:
|
||||
__import__(module_name, level=0)
|
||||
module = sys.modules[module_name]
|
||||
module_suite = loader.loadTestsFromModule(module)
|
||||
test_suite.addTest(module_suite)
|
||||
return test_suite
|
||||
|
||||
def load_args(self, args):
|
||||
loader = RegexTestLoader(self.regex)
|
||||
|
||||
suites = []
|
||||
for arg in args:
|
||||
suite = loader.loadTestsFromName(arg)
|
||||
# loadTestsFromName() can only process names that refer to
|
||||
# individual test functions or modules. It can't process package
|
||||
# names. If there were no module/function matches, check to see if
|
||||
# this looks like a package name.
|
||||
if suite.countTestCases() != 0:
|
||||
suites.append(suite)
|
||||
continue
|
||||
|
||||
# Load all modules whose name is <arg>.<something>
|
||||
prefix = arg + "."
|
||||
for module in self.modules:
|
||||
if module.startswith(prefix):
|
||||
suite = loader.loadTestsFromName(module)
|
||||
suites.append(suite)
|
||||
|
||||
return loader.suiteClass(suites)
|
||||
|
||||
|
||||
_COVERAGE_INI = '''\
|
||||
[report]
|
||||
exclude_lines =
|
||||
pragma: no cover
|
||||
pragma: nocover
|
||||
pragma:.*no${PLATFORM}
|
||||
pragma:.*no${PY_IMPL}${PY_MAJOR}${PY_MINOR}
|
||||
pragma:.*no${PY_IMPL}${PY_MAJOR}
|
||||
pragma:.*nopy${PY_MAJOR}
|
||||
pragma:.*nopy${PY_MAJOR}${PY_MINOR}
|
||||
'''
|
||||
|
||||
|
||||
class MainProgram(object):
|
||||
"""
|
||||
This class implements the main program. It can be subclassed by
|
||||
users who wish to customize some parts of the main program.
|
||||
(Adding additional command line options, customizing test loading, etc.)
|
||||
"""
|
||||
|
||||
DEFAULT_VERBOSITY = 2
|
||||
|
||||
def __init__(self, argv):
|
||||
self.init_option_parser()
|
||||
self.parse_options(argv)
|
||||
self.setup_logging()
|
||||
|
||||
def init_option_parser(self):
|
||||
usage = "%prog [options] [TEST] ..."
|
||||
op = optparse.OptionParser(usage=usage, add_help_option=False)
|
||||
self.option_parser = op
|
||||
|
||||
op.add_option(
|
||||
"--hide-output",
|
||||
dest="show_output",
|
||||
action="store_false",
|
||||
default=True,
|
||||
help="Suppress data that tests print to stdout/stderr, and only "
|
||||
"show it if the test fails.",
|
||||
)
|
||||
op.add_option(
|
||||
"-o",
|
||||
"--output",
|
||||
help="Write results to a file in a JSON format to be read by Buck",
|
||||
)
|
||||
op.add_option(
|
||||
"-f",
|
||||
"--failfast",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help="Stop after the first failure",
|
||||
)
|
||||
op.add_option(
|
||||
"-l",
|
||||
"--list-tests",
|
||||
action="store_true",
|
||||
dest="list",
|
||||
default=False,
|
||||
help="List tests and exit",
|
||||
)
|
||||
op.add_option(
|
||||
"-r",
|
||||
"--regex",
|
||||
default=None,
|
||||
help="Regex to apply to tests, to only run those tests",
|
||||
)
|
||||
op.add_option(
|
||||
"--collect-coverage",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help="Collect test coverage information",
|
||||
)
|
||||
op.add_option(
|
||||
"--coverage-include",
|
||||
default="*",
|
||||
help='File globs to include in converage (split by ",")',
|
||||
)
|
||||
op.add_option(
|
||||
"--coverage-omit",
|
||||
default="",
|
||||
help='File globs to omit from converage (split by ",")',
|
||||
)
|
||||
op.add_option(
|
||||
"--logger",
|
||||
action="append",
|
||||
metavar="<category>=<level>",
|
||||
default=[],
|
||||
help="Configure log levels for specific logger categories",
|
||||
)
|
||||
op.add_option(
|
||||
"-q",
|
||||
"--quiet",
|
||||
action="count",
|
||||
default=0,
|
||||
help="Decrease the verbosity (may be specified multiple times)",
|
||||
)
|
||||
op.add_option(
|
||||
"-v",
|
||||
"--verbosity",
|
||||
action="count",
|
||||
default=self.DEFAULT_VERBOSITY,
|
||||
help="Increase the verbosity (may be specified multiple times)",
|
||||
)
|
||||
op.add_option(
|
||||
"-?", "--help", action="help", help="Show this help message and exit"
|
||||
)
|
||||
|
||||
def parse_options(self, argv):
|
||||
self.options, self.test_args = self.option_parser.parse_args(argv[1:])
|
||||
self.options.verbosity -= self.options.quiet
|
||||
|
||||
if self.options.collect_coverage and coverage is None:
|
||||
self.option_parser.error("coverage module is not available")
|
||||
self.options.coverage_include = self.options.coverage_include.split(",")
|
||||
if self.options.coverage_omit == "":
|
||||
self.options.coverage_omit = []
|
||||
else:
|
||||
self.options.coverage_omit = self.options.coverage_omit.split(",")
|
||||
|
||||
def setup_logging(self):
|
||||
# Configure the root logger to log at INFO level.
|
||||
# This is similar to logging.basicConfig(), but uses our
|
||||
# StderrLogHandler instead of a StreamHandler.
|
||||
fmt = logging.Formatter("%(pathname)s:%(lineno)s: %(message)s")
|
||||
log_handler = StderrLogHandler()
|
||||
log_handler.setFormatter(fmt)
|
||||
root_logger = logging.getLogger()
|
||||
root_logger.addHandler(log_handler)
|
||||
root_logger.setLevel(logging.INFO)
|
||||
|
||||
level_names = {
|
||||
"debug": logging.DEBUG,
|
||||
"info": logging.INFO,
|
||||
"warn": logging.WARNING,
|
||||
"warning": logging.WARNING,
|
||||
"error": logging.ERROR,
|
||||
"critical": logging.CRITICAL,
|
||||
"fatal": logging.FATAL,
|
||||
}
|
||||
|
||||
for value in self.options.logger:
|
||||
parts = value.rsplit("=", 1)
|
||||
if len(parts) != 2:
|
||||
self.option_parser.error(
|
||||
"--logger argument must be of the "
|
||||
"form <name>=<level>: %s" % value
|
||||
)
|
||||
name = parts[0]
|
||||
level_name = parts[1].lower()
|
||||
level = level_names.get(level_name)
|
||||
if level is None:
|
||||
self.option_parser.error(
|
||||
"invalid log level %r for log " "category %s" % (parts[1], name)
|
||||
)
|
||||
logging.getLogger(name).setLevel(level)
|
||||
|
||||
def create_loader(self):
|
||||
import __test_modules__
|
||||
|
||||
return Loader(__test_modules__.TEST_MODULES, self.options.regex)
|
||||
|
||||
def load_tests(self):
|
||||
loader = self.create_loader()
|
||||
if self.options.collect_coverage:
|
||||
self.start_coverage()
|
||||
include = self.options.coverage_include
|
||||
omit = self.options.coverage_omit
|
||||
if include and "*" not in include:
|
||||
optimize_for_coverage(self.cov, include, omit)
|
||||
|
||||
if self.test_args:
|
||||
suite = loader.load_args(self.test_args)
|
||||
else:
|
||||
suite = loader.load_all()
|
||||
if self.options.collect_coverage:
|
||||
self.cov.start()
|
||||
return suite
|
||||
|
||||
def get_tests(self, test_suite):
|
||||
tests = []
|
||||
|
||||
for test in test_suite:
|
||||
if isinstance(test, unittest.TestSuite):
|
||||
tests.extend(self.get_tests(test))
|
||||
else:
|
||||
tests.append(test)
|
||||
|
||||
return tests
|
||||
|
||||
def run(self):
|
||||
test_suite = self.load_tests()
|
||||
|
||||
if self.options.list:
|
||||
for test in self.get_tests(test_suite):
|
||||
method_name = getattr(test, "_testMethodName", "")
|
||||
name = _format_test_name(test.__class__, method_name)
|
||||
print(name)
|
||||
return EXIT_CODE_SUCCESS
|
||||
else:
|
||||
result = self.run_tests(test_suite)
|
||||
if self.options.output is not None:
|
||||
with open(self.options.output, "w") as f:
|
||||
json.dump(result.getResults(), f, indent=4, sort_keys=True)
|
||||
if not result.wasSuccessful():
|
||||
return EXIT_CODE_TEST_FAILURE
|
||||
return EXIT_CODE_SUCCESS
|
||||
|
||||
def run_tests(self, test_suite):
|
||||
# Install a signal handler to catch Ctrl-C and display the results
|
||||
# (but only if running >2.6).
|
||||
if sys.version_info[0] > 2 or sys.version_info[1] > 6:
|
||||
unittest.installHandler()
|
||||
|
||||
# Run the tests
|
||||
runner = BuckTestRunner(
|
||||
self,
|
||||
test_suite,
|
||||
verbosity=self.options.verbosity,
|
||||
show_output=self.options.show_output,
|
||||
)
|
||||
result = runner.run(test_suite)
|
||||
|
||||
if self.options.collect_coverage and self.options.show_output:
|
||||
self.cov.stop()
|
||||
try:
|
||||
self.cov.report(file=sys.stdout)
|
||||
except coverage.misc.CoverageException:
|
||||
print("No lines were covered, potentially restricted by file filters")
|
||||
|
||||
return result
|
||||
|
||||
def get_abbr_impl(self):
|
||||
"""Return abbreviated implementation name."""
|
||||
impl = platform.python_implementation()
|
||||
if impl == "PyPy":
|
||||
return "pp"
|
||||
elif impl == "Jython":
|
||||
return "jy"
|
||||
elif impl == "IronPython":
|
||||
return "ip"
|
||||
elif impl == "CPython":
|
||||
return "cp"
|
||||
else:
|
||||
raise RuntimeError("unknown python runtime")
|
||||
|
||||
def start_coverage(self):
|
||||
if not self.options.collect_coverage:
|
||||
return
|
||||
|
||||
with tempfile.NamedTemporaryFile('w', delete=False) as coverage_ini:
|
||||
coverage_ini.write(_COVERAGE_INI)
|
||||
self._coverage_ini_path = coverage_ini.name
|
||||
|
||||
# Keep the original working dir in case tests use os.chdir
|
||||
self._original_working_dir = os.getcwd()
|
||||
|
||||
# for coverage config ignores by platform/python version
|
||||
os.environ["PLATFORM"] = sys.platform
|
||||
os.environ["PY_IMPL"] = self.get_abbr_impl()
|
||||
os.environ["PY_MAJOR"] = str(sys.version_info.major)
|
||||
os.environ["PY_MINOR"] = str(sys.version_info.minor)
|
||||
|
||||
self.cov = coverage.Coverage(
|
||||
include=self.options.coverage_include,
|
||||
omit=self.options.coverage_omit,
|
||||
config_file=coverage_ini.name,
|
||||
)
|
||||
self.cov.erase()
|
||||
self.cov.start()
|
||||
|
||||
def get_coverage(self):
|
||||
if not self.options.collect_coverage:
|
||||
return None
|
||||
|
||||
try:
|
||||
os.remove(self._coverage_ini_path)
|
||||
except OSError:
|
||||
pass # Better to litter than to fail the test
|
||||
|
||||
# Switch back to the original working directory.
|
||||
os.chdir(self._original_working_dir)
|
||||
|
||||
result = {}
|
||||
|
||||
self.cov.stop()
|
||||
|
||||
try:
|
||||
f = StringIO()
|
||||
self.cov.report(file=f)
|
||||
lines = f.getvalue().split("\n")
|
||||
except coverage.misc.CoverageException:
|
||||
# Nothing was covered. That's fine by us
|
||||
return result
|
||||
|
||||
# N.B.: the format of the coverage library's output differs
|
||||
# depending on whether one or more files are in the results
|
||||
for line in lines[2:]:
|
||||
if line.strip("-") == "":
|
||||
break
|
||||
r = line.split()[0]
|
||||
analysis = self.cov.analysis2(r)
|
||||
covString = self.convert_to_diff_cov_str(analysis)
|
||||
if covString:
|
||||
result[r] = covString
|
||||
|
||||
return result
|
||||
|
||||
def convert_to_diff_cov_str(self, analysis):
|
||||
# Info on the format of analysis:
|
||||
# http://nedbatchelder.com/code/coverage/api.html
|
||||
if not analysis:
|
||||
return None
|
||||
numLines = max(
|
||||
analysis[1][-1] if len(analysis[1]) else 0,
|
||||
analysis[2][-1] if len(analysis[2]) else 0,
|
||||
analysis[3][-1] if len(analysis[3]) else 0,
|
||||
)
|
||||
lines = ["N"] * numLines
|
||||
for l in analysis[1]:
|
||||
lines[l - 1] = "C"
|
||||
for l in analysis[2]:
|
||||
lines[l - 1] = "X"
|
||||
for l in analysis[3]:
|
||||
lines[l - 1] = "U"
|
||||
return "".join(lines)
|
||||
|
||||
|
||||
def main(argv):
|
||||
return MainProgram(sys.argv).run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main(sys.argv))
|
Loading…
Reference in New Issue
Block a user