sapling/eden/test_support/temporary_directory.py
Xavier Deguillard 303d37ab96 test_support: canonicalize the temporary directory path
Summary:
With D30320515, EdenFS internally canonicalize all the mount paths passed to
it. As a result, the output of `eden list` may not match the path given to
`eden mount` if one of the directory leading to the mount point was a symlink.
Since some tests are comparing both, this can lead to some test failures.

To solve this, we simply need to make sure that the temporary directory for the
test is canonicalized.

Reviewed By: fanzeyi

Differential Revision: D30349411

fbshipit-source-id: 139d4be02b5783c6a439270845239acab6a6c955
2021-08-16 16:08:45 -07:00

236 lines
7.7 KiB
Python

#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2.
# pyre-strict
import abc
import logging
import os
import pathlib
import shutil
import tempfile
import types
import typing
from pathlib import Path
from typing import (
Any,
BinaryIO,
Callable,
Generic,
Optional,
TextIO,
Tuple,
Type,
TypeVar,
Union,
)
def cleanup_tmp_dir(tmp_dir: Path) -> None:
"""Clean up a temporary directory.
This is similar to shutil.rmtree() but also handles removing read-only files and
directories. This function changes the permissions on files and directories if this
is necessary to remove them.
This is necessary for removing Eden checkout directories since "eden clone" makes
the original mount point directory read-only.
"""
# If we encounter an EPERM or EACCESS error removing a file try making its parent
# directory writable and then retry the removal.
def _remove_readonly(
# pyre-fixme[24]: Generic type `os.PathLike` expects 1 type parameter.
func: Callable[[Union[os.PathLike, str]], Any], # pyre-fixme[2]
# pyre-fixme[24]: Generic type `os.PathLike` expects 1 type parameter.
path: Union[os.PathLike, str],
exc_info: Tuple[Type[BaseException], BaseException, types.TracebackType],
) -> None:
_ex_type, ex, _traceback = exc_info
if path == tmp_dir:
logging.warning(
f"failed to remove temporary test directory {tmp_dir}: {ex}"
)
return
if not isinstance(ex, PermissionError):
logging.warning(f"error removing file in temporary directory {path}: {ex}")
return
try:
# func() is the function that failed.
# This is usually os.unlink() or os.rmdir(). I have started to see
# open fail as well. We don't have all the right arguments to retry
# the call, so we just have to deal with the failure. If you are
# debugging failures here, this is probably not the root cause of
# the problem, but a error cleaning up a broken test.
if func not in (os.unlink, os.rmdir):
raise ex
parent_dir = os.path.dirname(path)
os.chmod(parent_dir, 0o755)
func(path)
except OSError as ex:
logging.warning(f"error removing file in temporary directory {path}: {ex}")
return
shutil.rmtree(tmp_dir, onerror=_remove_readonly)
class TempFileManager:
"""TempFileManager exists for managing a set of temporary files and directories.
It creates all temporary files and directories in a single top-level directory,
which can later be cleaned up in one pass.
This helps make it a little easier to track temporary test artifacts while
debugging, and helps make it easier to identify when tests have failed to clean up
their temporary files.
This is also necessary on Windows because the standard tempfile.NamedTemporaryFile
class unfortunately does not work well there: the temporary files it creates cannot
be opened by other processes.
"""
_temp_dir: Optional[Path] = None
_prefix: Optional[str]
def __init__(self, prefix: Optional[str] = "eden_test.") -> None:
self._prefix = prefix
def __enter__(self) -> "TempFileManager":
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
tb: Optional[types.TracebackType],
) -> None:
self.cleanup(exc_type is not None)
def cleanup(self, failure: bool = False) -> None:
temp_dir = self._temp_dir
if temp_dir is None:
return
cleanup_mode = os.environ.get("EDEN_TEST_CLEANUP", "always").lower()
if cleanup_mode in ("0", "no", "false") or (
failure and cleanup_mode == "success-only"
):
print(f"Leaving behind eden test directory {temp_dir}")
else:
cleanup_tmp_dir(temp_dir)
self._temp_dir = None
def make_temp_dir(self, prefix: Optional[str] = None) -> Path:
top_level = self.top_level_tmp_dir()
path_str = tempfile.mkdtemp(prefix=prefix, dir=str(top_level))
return Path(path_str)
def make_temp_file(
self, prefix: Optional[str] = None, mode: str = "r+"
) -> "TemporaryTextFile":
top_level = self.top_level_tmp_dir()
fd, path_str = tempfile.mkstemp(prefix=prefix, dir=str(top_level))
file_obj = os.fdopen(fd, mode, encoding="utf-8")
# pyre-fixme[6]: Expected `TextIO` for 1st param but got `IO[typing.Any]`.
return TemporaryTextFile(file_obj, Path(path_str))
def make_temp_binary(
self, prefix: Optional[str] = None, mode: str = "rb+"
) -> "TemporaryBinaryFile":
top_level = self.top_level_tmp_dir()
fd, path_str = tempfile.mkstemp(prefix=prefix, dir=str(top_level))
file_obj = os.fdopen(fd, mode)
# pyre-fixme[6]: Expected `BinaryIO` for 1st param but got `IO[typing.Any]`.
return TemporaryBinaryFile(file_obj, Path(path_str))
def top_level_tmp_dir(self) -> Path:
top = self._temp_dir
if top is None:
top = Path(tempfile.mkdtemp(prefix=self._prefix)).resolve()
self._temp_dir = top
return top
def set_tmp_prefix(self, prefix: str) -> None:
if self._temp_dir is not None:
logging.warning(
f"cannot update temporary directory prefix to {prefix}: "
f"temporary directory {self._temp_dir} was already created"
)
return
self._prefix = prefix
IOType = TypeVar("IOType", TextIO, BinaryIO)
T = TypeVar("T")
class TemporaryFileBase(Generic[IOType]):
"""This class is largely equivalent to tempfile.NamedTemporaryFile,
but it also works on Windows. (The standard library NamedTemporaryFile class
creates files that cannot be opened by other processes.)
We don't have any logic for closing the file here since the entire containing
directory will eventually be removed by TempFileManager.
"""
file: IOType
name: str
path: Path
def __init__(self, file: IOType, path: Path) -> None:
self.file = file
self.path = path
self.name = str(path)
def __getattr__(self, name: str) -> Any: # pyre-fixme[3]
if name in ("name", "path"):
return self.__dict__[name]
else:
file = self.__dict__["file"]
value = getattr(file, name)
setattr(self, name, value)
return value
def __enter__(self: T) -> T:
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
tb: Optional[types.TracebackType],
) -> None:
self.file.close()
class TemporaryTextFile(TemporaryFileBase[TextIO]):
pass
class TemporaryBinaryFile(TemporaryFileBase[BinaryIO]):
pass
class TemporaryDirectoryMixin(metaclass=abc.ABCMeta):
temp_file_manager: TempFileManager = TempFileManager()
_temp_cleanup_added: bool = False
def make_temporary_directory(self, prefix: Optional[str] = None) -> str:
self._ensure_temp_cleanup()
return str(self.temp_file_manager.make_temp_dir(prefix=prefix))
def _ensure_temp_cleanup(self) -> None:
if not self._temp_cleanup_added:
self.addCleanup(self.temp_file_manager.cleanup)
self._temp_cleanup_added = True
def addCleanup(
self, function: Callable[[], None], *args: Any, **kwargs: Any
) -> None:
raise NotImplementedError()