mirror of
https://github.com/facebook/sapling.git
synced 2024-10-10 08:47:12 +03:00
246415f23b
Summary: I think that the broken config (wrong relative base for search_path), was what prevented the upgrade from going automatically. Reviewed By: grievejia Differential Revision: D22966243 fbshipit-source-id: 4ef42a8e2e6f2c79483301c6876509a3009a83d1
229 lines
7.3 KiB
Python
229 lines
7.3 KiB
Python
#!/usr/bin/env python3
|
|
# Copyright (c) Facebook, Inc. and its affiliates.
|
|
#
|
|
# This software may be used and distributed according to the terms of the
|
|
# GNU General Public License version 2.
|
|
|
|
# pyre-strict
|
|
|
|
import abc
|
|
import logging
|
|
import os
|
|
import pathlib
|
|
import shutil
|
|
import tempfile
|
|
import types
|
|
import typing
|
|
from pathlib import Path
|
|
from typing import (
|
|
Any,
|
|
BinaryIO,
|
|
Callable,
|
|
Generic,
|
|
Optional,
|
|
TextIO,
|
|
Tuple,
|
|
Type,
|
|
TypeVar,
|
|
Union,
|
|
)
|
|
|
|
|
|
def cleanup_tmp_dir(tmp_dir: Path) -> None:
|
|
"""Clean up a temporary directory.
|
|
|
|
This is similar to shutil.rmtree() but also handles removing read-only files and
|
|
directories. This function changes the permissions on files and directories if this
|
|
is necessary to remove them.
|
|
|
|
This is necessary for removing Eden checkout directories since "eden clone" makes
|
|
the original mount point directory read-only.
|
|
"""
|
|
# If we encounter an EPERM or EACCESS error removing a file try making its parent
|
|
# directory writable and then retry the removal.
|
|
def _remove_readonly(
|
|
func: Callable[[Union[os.PathLike, str]], Any], # pyre-fixme[2]
|
|
path: Union[os.PathLike, str],
|
|
exc_info: Tuple[Type[BaseException], BaseException, types.TracebackType],
|
|
) -> None:
|
|
_ex_type, ex, _traceback = exc_info
|
|
if path == tmp_dir:
|
|
logging.warning(
|
|
f"failed to remove temporary test directory {tmp_dir}: {ex}"
|
|
)
|
|
return
|
|
if not isinstance(ex, PermissionError):
|
|
logging.warning(f"error removing file in temporary directory {path}: {ex}")
|
|
return
|
|
|
|
try:
|
|
# pyre-fixme[6]: Expected `_PathLike[AnyStr]` for 1st param but got
|
|
# `Union[_PathLike[Any], str]`.
|
|
parent_dir = os.path.dirname(path)
|
|
os.chmod(parent_dir, 0o755)
|
|
# func() is the function that failed.
|
|
# This is usually os.unlink() or os.rmdir().
|
|
func(path)
|
|
except OSError as ex:
|
|
logging.warning(f"error removing file in temporary directory {path}: {ex}")
|
|
return
|
|
|
|
shutil.rmtree(tmp_dir, onerror=_remove_readonly)
|
|
|
|
|
|
class TempFileManager:
|
|
"""TempFileManager exists for managing a set of temporary files and directories.
|
|
|
|
It creates all temporary files and directories in a single top-level directory,
|
|
which can later be cleaned up in one pass.
|
|
|
|
This helps make it a little easier to track temporary test artifacts while
|
|
debugging, and helps make it easier to identify when tests have failed to clean up
|
|
their temporary files.
|
|
|
|
This is also necessary on Windows because the standard tempfile.NamedTemporaryFile
|
|
class unfortunately does not work well there: the temporary files it creates cannot
|
|
be opened by other processes.
|
|
"""
|
|
|
|
_temp_dir: Optional[Path] = None
|
|
_prefix: Optional[str]
|
|
|
|
def __init__(self, prefix: Optional[str] = "eden_test.") -> None:
|
|
self._prefix = prefix
|
|
|
|
def __enter__(self) -> "TempFileManager":
|
|
return self
|
|
|
|
def __exit__(
|
|
self,
|
|
exc_type: Optional[Type[BaseException]],
|
|
exc_value: Optional[BaseException],
|
|
tb: Optional[types.TracebackType],
|
|
) -> None:
|
|
self.cleanup(exc_type is not None)
|
|
|
|
def cleanup(self, failure: bool = False) -> None:
|
|
temp_dir = self._temp_dir
|
|
if temp_dir is None:
|
|
return
|
|
|
|
cleanup_mode = os.environ.get("EDEN_TEST_CLEANUP", "always").lower()
|
|
if cleanup_mode in ("0", "no", "false") or (
|
|
failure and cleanup_mode == "success-only"
|
|
):
|
|
print(f"Leaving behind eden test directory {temp_dir}")
|
|
else:
|
|
cleanup_tmp_dir(temp_dir)
|
|
self._temp_dir = None
|
|
|
|
def make_temp_dir(self, prefix: Optional[str] = None) -> Path:
|
|
top_level = self.top_level_tmp_dir()
|
|
path_str = tempfile.mkdtemp(prefix=prefix, dir=str(top_level))
|
|
return Path(path_str)
|
|
|
|
def make_temp_file(
|
|
self, prefix: Optional[str] = None, mode: str = "r+"
|
|
) -> "TemporaryTextFile":
|
|
top_level = self.top_level_tmp_dir()
|
|
fd, path_str = tempfile.mkstemp(prefix=prefix, dir=str(top_level))
|
|
file_obj = os.fdopen(fd, mode, encoding="utf-8")
|
|
# pyre-fixme[6]: Expected `TextIO` for 1st param but got `IO[typing.Any]`.
|
|
return TemporaryTextFile(file_obj, Path(path_str))
|
|
|
|
def make_temp_binary(
|
|
self, prefix: Optional[str] = None, mode: str = "rb+"
|
|
) -> "TemporaryBinaryFile":
|
|
top_level = self.top_level_tmp_dir()
|
|
fd, path_str = tempfile.mkstemp(prefix=prefix, dir=str(top_level))
|
|
file_obj = os.fdopen(fd, mode)
|
|
# pyre-fixme[6]: Expected `BinaryIO` for 1st param but got `IO[typing.Any]`.
|
|
return TemporaryBinaryFile(file_obj, Path(path_str))
|
|
|
|
def top_level_tmp_dir(self) -> Path:
|
|
top = self._temp_dir
|
|
if top is None:
|
|
top = Path(tempfile.mkdtemp(prefix=self._prefix))
|
|
self._temp_dir = top
|
|
|
|
return top
|
|
|
|
def set_tmp_prefix(self, prefix: str) -> None:
|
|
if self._temp_dir is not None:
|
|
logging.warning(
|
|
f"cannot update temporary directory prefix to {prefix}: "
|
|
f"temporary directory {self._temp_dir} was already created"
|
|
)
|
|
return
|
|
self._prefix = prefix
|
|
|
|
|
|
IOType = TypeVar("IOType", TextIO, BinaryIO)
|
|
T = TypeVar("T")
|
|
|
|
|
|
class TemporaryFileBase(Generic[IOType]):
|
|
"""This class is largely equivalent to tempfile.NamedTemporaryFile,
|
|
but it also works on Windows. (The standard library NamedTemporaryFile class
|
|
creates files that cannot be opened by other processes.)
|
|
|
|
We don't have any logic for closing the file here since the entire containing
|
|
directory will eventually be removed by TempFileManager.
|
|
"""
|
|
|
|
file: IOType
|
|
name: str
|
|
path: Path
|
|
|
|
def __init__(self, file: IOType, path: Path) -> None:
|
|
self.file = file
|
|
self.path = path
|
|
self.name = str(path)
|
|
|
|
def __getattr__(self, name: str) -> Any: # pyre-fixme[3]
|
|
if name in ("name", "path"):
|
|
return self.__dict__[name]
|
|
else:
|
|
file = self.__dict__["file"]
|
|
value = getattr(file, name)
|
|
setattr(self, name, value)
|
|
return value
|
|
|
|
def __enter__(self: T) -> T:
|
|
return self
|
|
|
|
def __exit__(
|
|
self,
|
|
exc_type: Optional[Type[BaseException]],
|
|
exc_value: Optional[BaseException],
|
|
tb: Optional[types.TracebackType],
|
|
) -> None:
|
|
self.file.close()
|
|
|
|
|
|
class TemporaryTextFile(TemporaryFileBase[TextIO]):
|
|
pass
|
|
|
|
|
|
class TemporaryBinaryFile(TemporaryFileBase[BinaryIO]):
|
|
pass
|
|
|
|
|
|
class TemporaryDirectoryMixin(metaclass=abc.ABCMeta):
|
|
temp_file_manager: TempFileManager = TempFileManager()
|
|
_temp_cleanup_added: bool = False
|
|
|
|
def make_temporary_directory(self, prefix: Optional[str] = None) -> str:
|
|
self._ensure_temp_cleanup()
|
|
return str(self.temp_file_manager.make_temp_dir(prefix=prefix))
|
|
|
|
def _ensure_temp_cleanup(self) -> None:
|
|
if not self._temp_cleanup_added:
|
|
self.addCleanup(self.temp_file_manager.cleanup)
|
|
self._temp_cleanup_added = True
|
|
|
|
def addCleanup(
|
|
self, function: Callable[[], None], *args: Any, **kwargs: Any
|
|
) -> None:
|
|
raise NotImplementedError()
|