sapling/eden/integration/lib/edenclient.py
Zeyi (Rice) Fan d20657bfc4 integration: teach integration test to arrange real edenfsctl via environ
Reviewed By: xavierd

Differential Revision: D30819280

fbshipit-source-id: de14ccb13ddec8ce90b0fa7d2aa987ea50f14d43
2021-09-29 10:02:09 -07:00

712 lines
24 KiB
Python

#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2.
import json
import logging
import os
import pathlib
import shlex
import shutil
import signal
import subprocess
import sys
import tempfile
import threading
from pathlib import Path
from types import TracebackType
from typing import Any, Dict, List, Optional, Union, cast, TextIO, Tuple
from eden.fs.cli import util
from eden.thrift.legacy import EdenClient, create_thrift_client
from facebook.eden.ttypes import MountState
from .find_executables import FindExe
# Two minutes is very generous, but 30 seconds is not enough CI hosts
# and many-core machines under load.
EDENFS_START_TIMEOUT = 120
EDENFS_STOP_TIMEOUT = 240
class EdenFS(object):
"""Manages an instance of the EdenFS fuse server."""
_eden_dir: Path
def __init__(
self,
base_dir: Optional[Path] = None,
eden_dir: Optional[Path] = None,
etc_eden_dir: Optional[Path] = None,
home_dir: Optional[Path] = None,
logging_settings: Optional[Dict[str, str]] = None,
extra_args: Optional[List[str]] = None,
storage_engine: str = "memory",
) -> None:
"""
Construct a new EdenFS object.
By default, all of the state directories needed for the edenfs daemon will be
created under the directory specified by base_dir. If base_dir is not
specified, a temporary directory will be created. The temporary directory will
be removed when cleanup() or __exit__() is called on the EdenFS object.
Explicit locations for various state directories (eden_dir, etc_eden_dir,
home_dir) can also be given, if desired. For instance, this allows an EdenFS
object to be created for an existing eden state directory.
"""
if base_dir is None:
self._base_dir = Path(tempfile.mkdtemp(prefix="eden_test."))
self._cleanup_base_dir = True
else:
self._base_dir = base_dir
self._cleanup_base_dir = False
if eden_dir is None:
self._eden_dir = self._base_dir / "eden"
self._eden_dir.mkdir(exist_ok=True)
else:
self._eden_dir = eden_dir
if etc_eden_dir is None:
self._etc_eden_dir = self._base_dir / "etc_eden"
self._etc_eden_dir.mkdir(exist_ok=True)
else:
self._etc_eden_dir = etc_eden_dir
if home_dir is None:
self._home_dir = self._base_dir / "home"
self._home_dir.mkdir(exist_ok=True)
else:
self._home_dir = home_dir
self._storage_engine = storage_engine
self._logging_settings = logging_settings
self._extra_args = extra_args
self._process: Optional[subprocess.Popen] = None
@property
def eden_dir(self) -> Path:
return self._eden_dir
@property
def etc_eden_dir(self) -> Path:
return self._etc_eden_dir
@property
def home_dir(self) -> Path:
return self._home_dir
@property
def user_rc_path(self) -> Path:
return self._home_dir / ".edenrc"
@property
def system_rc_path(self) -> Path:
return self._etc_eden_dir / "edenfs.rc"
def __enter__(self) -> "EdenFS":
return self
def __exit__(
self, exc_type: type, exc_value: BaseException, tb: TracebackType
) -> bool:
self.cleanup()
return False
def cleanup(self) -> None:
"""Stop the instance and clean up its temporary directories"""
self.kill()
if self._cleanup_base_dir:
shutil.rmtree(self._base_dir, ignore_errors=True)
def kill(self) -> None:
"""Stops and unmounts this instance."""
process = self._process
if process is None or process.returncode is not None:
return
self.shutdown()
def get_thrift_client(self, timeout: Optional[float] = None) -> EdenClient:
return create_thrift_client(str(self._eden_dir), timeout=timeout)
def run_cmd(
self,
command: str,
*args: str,
cwd: Optional[str] = None,
capture_stderr: bool = False,
encoding: str = "utf-8",
) -> str:
"""
Run the specified eden command.
Args: The eden command name and any arguments to pass to it.
Usage example: run_cmd('mount', 'my_eden_client')
Throws a subprocess.CalledProcessError if eden exits unsuccessfully.
"""
cmd, env = self.get_edenfsctl_cmd_env(command, *args)
try:
stderr = subprocess.STDOUT if capture_stderr else subprocess.PIPE
# TODO(T37669726): Re-enable LSAN.
env["LSAN_OPTIONS"] = "detect_leaks=0:verbosity=1:log_threads=1"
completed_process = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=stderr,
check=True,
cwd=cwd,
env=env,
encoding=encoding,
)
except subprocess.CalledProcessError as ex:
# Re-raise our own exception type so we can include the error
# output.
raise EdenCommandError(ex) from None
return completed_process.stdout
def run_unchecked(
self, command: str, *args: str, **kwargs: Any
) -> subprocess.CompletedProcess:
"""Run the specified eden command.
Args: The eden command name and any arguments to pass to it.
Usage example: run_cmd('mount', 'my_eden_client')
Returns a subprocess.CompletedProcess object.
"""
cmd, edenfsctl_env = self.get_edenfsctl_cmd_env(command, *args)
if "env" in kwargs:
edenfsctl_env.update(kwargs["env"])
kwargs["env"] = edenfsctl_env
return subprocess.run(cmd, **kwargs)
def get_edenfsctl_cmd_env(
self, command: str, *args: str
) -> Tuple[List[str], Dict[str, str]]:
"""Combines the specified eden command args with the appropriate
defaults.
Args:
command: the eden command
*args: extra string arguments to the command
Returns:
A list of arguments to run Eden that can be used with
subprocess.Popen() or subprocess.check_call().
"""
edenfsctl, env = FindExe.get_edenfsctl_env()
cmd = [
edenfsctl,
"--config-dir",
str(self._eden_dir),
"--etc-eden-dir",
str(self._etc_eden_dir),
"--home-dir",
str(self._home_dir),
]
cmd.append(command)
cmd.extend(args)
return cmd, env
def wait_for_is_healthy(self, timeout: float = EDENFS_START_TIMEOUT) -> bool:
process = self._process
assert process is not None
health = util.wait_for_daemon_healthy(
proc=process,
config_dir=self._eden_dir,
get_client=self.get_thrift_client,
timeout=timeout,
)
return health.is_healthy()
def start(
self,
timeout: float = EDENFS_START_TIMEOUT,
takeover_from: Optional[int] = None,
extra_args: Optional[List[str]] = None,
) -> None:
"""
Run "eden daemon" to start the eden daemon.
"""
use_gdb = False
if os.environ.get("EDEN_GDB"):
use_gdb = True
# Starting up under GDB takes longer than normal.
# Allow an extra 90 seconds (for some reason GDB can take a very
# long time to load symbol information, particularly on dynamically
# linked builds).
timeout += 90
takeover = takeover_from is not None
self.spawn_nowait(gdb=use_gdb, takeover=takeover, extra_args=extra_args)
process = self._process
assert process is not None
util.wait_for_daemon_healthy(
proc=process,
config_dir=self._eden_dir,
get_client=self.get_thrift_client,
timeout=timeout,
exclude_pid=takeover_from,
)
def get_extra_daemon_args(self) -> List[str]:
extra_daemon_args: List[str] = [
# Defaulting to 8 import processes is excessive when the test
# framework runs tests on each CPU core.
"--num_hg_import_threads",
"2",
"--local_storage_engine_unsafe",
self._storage_engine,
"--hgPath",
FindExe.HG_REAL,
]
privhelper = FindExe.EDEN_PRIVHELPER
if privhelper is not None:
extra_daemon_args.extend(["--privhelper_path", privhelper])
if "SANDCASTLE" in os.environ:
extra_daemon_args.append("--allowRoot")
# Turn up the VLOG level for the fuse server so that errors are logged
# with an explanation when they bubble up to RequestData::catchErrors
logging_settings = self._logging_settings
if logging_settings:
logging_arg = ",".join(
"%s=%s" % (module, level)
for module, level in sorted(logging_settings.items())
)
extra_daemon_args.extend(["--logging=" + logging_arg])
extra_args = self._extra_args
if extra_args:
extra_daemon_args.extend(extra_args)
return extra_daemon_args
def spawn_nowait(
self,
gdb: bool = False,
takeover: bool = False,
extra_args: Optional[List[str]] = None,
) -> None:
"""
Start edenfs but do not wait for it to become healthy.
"""
if self._process is not None:
raise Exception("cannot start an already-running eden client")
args, env = self.get_edenfsctl_cmd_env(
"daemon", "--daemon-binary", FindExe.EDEN_DAEMON, "--foreground"
)
extra_daemon_args = self.get_extra_daemon_args()
if extra_args:
extra_daemon_args.extend(extra_args)
if takeover:
args.append("--takeover")
# If the EDEN_GDB environment variable is set, run eden inside gdb
# so a developer can debug crashes
if os.environ.get("EDEN_GDB"):
gdb_exit_handler = (
"python gdb.events.exited.connect("
"lambda event: "
'gdb.execute("quit") if getattr(event, "exit_code", None) == 0 '
"else False"
")"
)
gdb_args = [
# Register a handler to exit gdb if the program finishes
# successfully.
# Start the program immediately when gdb starts
"-ex",
gdb_exit_handler,
# Start the program immediately when gdb starts
"-ex",
"run",
]
args.append("--gdb")
for arg in gdb_args:
args.append("--gdb-arg=" + arg)
if "EDEN_DAEMON_ARGS" in os.environ:
args.extend(shlex.split(os.environ["EDEN_DAEMON_ARGS"]))
full_args = args + ["--"] + extra_daemon_args
logging.info(
"Invoking eden daemon: %s", " ".join(shlex.quote(arg) for arg in full_args)
)
process = subprocess.Popen(
full_args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
env=env,
)
# TODO(T69605343): Until TPX properly knows how to redirect writes done
# to filedescriptors directly, we need to manually redirect EdenFS logs
# to sys.std{out,err}.
def redirect_stream(input_stream: TextIO, output_stream: TextIO) -> None:
while True:
line = input_stream.readline()
if line == "":
input_stream.close()
return
output_stream.write(line)
threading.Thread(
target=redirect_stream, args=(process.stdout, sys.stdout), daemon=True
).start()
threading.Thread(
target=redirect_stream, args=(process.stderr, sys.stderr), daemon=True
).start()
self._process = process
def shutdown(self) -> None:
"""
Run "eden shutdown" to stop the eden daemon.
"""
process = self._process
assert process is not None
# Before shutting down, get the current pid. This may differ from process.pid when
# edenfs is started with sudo.
daemon_pid = util.check_health(
self.get_thrift_client, self.eden_dir, timeout=30
).pid
# Run "edenfsctl stop" with a timeout of 0 to tell it not to wait for the EdenFS
# process to exit. Since we are running it directly (self._process) we will
# need to wait on it. Depending on exactly how it is being run the process may
# not go away until we wait on it.
self.run_cmd("stop", "-t", "0")
self._process = None
try:
return_code = process.wait(timeout=EDENFS_STOP_TIMEOUT)
except subprocess.TimeoutExpired:
# EdenFS did not exit normally on its own.
if can_run_sudo() and daemon_pid is not None:
os.kill(daemon_pid, signal.SIGKILL)
else:
process.kill()
process.wait(timeout=10)
raise Exception(
f"edenfs did not shutdown within {EDENFS_STOP_TIMEOUT} seconds; "
"had to send SIGKILL"
)
if return_code != 0:
raise Exception(
"eden exited unsuccessfully with status {}".format(return_code)
)
def restart(self) -> None:
self.shutdown()
self.start()
def get_pid_via_thrift(self) -> int:
with self.get_thrift_client() as client:
return client.getDaemonInfo().pid
def graceful_restart(self, timeout: float = EDENFS_START_TIMEOUT) -> None:
old_process = self._process
assert old_process is not None
# Get the process ID of the old edenfs process.
# Note that this is not necessarily self._process.pid, since the eden
# CLI may have spawned eden using sudo, and self._process may refer to
# a sudo parent process.
old_pid = self.get_pid_via_thrift()
self._process = None
self.start(timeout=timeout, takeover_from=old_pid)
# Check the return code from the old edenfs process
return_code = old_process.wait()
if return_code != 0:
raise Exception(
"eden exited unsuccessfully with status {}".format(return_code)
)
def run_takeover_tool(self, cmd: List[str]) -> None:
old_process = self._process
assert old_process is not None
subprocess.check_call(cmd)
self._process = None
return_code = old_process.wait()
if return_code != 0:
raise Exception(
f"eden exited unsuccessfully with status {return_code} "
"after a fake takeover stop"
)
def stop_with_stale_mounts(self) -> None:
"""Stop edenfs without unmounting any of its mount points.
This will leave the mount points mounted but no longer connected to a FUSE
daemon. Attempts to access files or directories inside the mount will fail with
an ENOTCONN error after this.
"""
cmd: List[str] = [FindExe.TAKEOVER_TOOL, "--edenDir", str(self._eden_dir)]
self.run_takeover_tool(cmd)
def fake_takeover_with_version(self, version: int) -> None:
"""
Execute a fake takeover to explicitly test downgrades and make sure
output is as expected. Right now, this is used as a sanity check to
make sure we don't crash.
"""
cmd: List[str] = [
FindExe.TAKEOVER_TOOL,
"--edenDir",
str(self._eden_dir),
"--takeoverVersion",
str(version),
]
self.run_takeover_tool(cmd)
def takeover_without_ping_response(self) -> None:
"""
Execute a fake takeover to explicitly test a failed takeover. The
takeover client does not send a ping with the nosendPing flag,
so the subprocess call will throw, and we expect the old process
to recover
"""
cmd: List[str] = [
FindExe.TAKEOVER_TOOL,
"--edenDir",
str(self._eden_dir),
"--noshouldPing",
]
try:
subprocess.check_call(cmd)
except Exception:
# We expect the new process to fail starting.
pass
def list_cmd(self) -> Dict[str, Dict[str, Any]]:
"""
Executes "eden list --json" to list the Eden checkouts and returns the result as
a dictionary.
"""
data = self.run_cmd("list", "--json")
return cast(Dict[str, Dict[str, Any]], json.loads(data))
def list_cmd_simple(self) -> Dict[str, str]:
"""
Executes "eden list --json" to list the Eden checkouts and returns the result in
a simplified format that can be more easily used in test case assertions.
The result is a dictionary of { mount_path: status }
The status is a string containing one of the MountState names, or "NOT_RUNNING"
if the mount is not running. If the mount is known to the running edenfs
instance but not listed in the configuration file, " (unconfigured)" will be
appended to the status string.
"""
results: Dict[str, str] = {}
for path, mount_info in self.list_cmd().items():
status_str = mount_info["state"]
if not mount_info["configured"]:
status_str += " (unconfigured)"
results[path] = status_str
return results
def get_mount_state(
self, mount: pathlib.Path, client: Optional[EdenClient] = None
) -> Optional[MountState]:
"""
Query edenfs over thrift for the state of the specified mount.
Returns the MountState enum, or None if edenfs does not currently know about
this mount path.
"""
if client is None:
with self.get_thrift_client() as client:
return self.get_mount_state(mount, client)
else:
for entry in client.listMounts():
entry_path = pathlib.Path(os.fsdecode(entry.mountPoint))
if entry_path == mount:
return entry.state
return None
def clone(
self,
repo: str,
path: Union[str, os.PathLike],
allow_empty: bool = False,
) -> None:
"""
Run "eden clone"
"""
params = ["clone", repo, str(path)]
if allow_empty:
params.append("--allow-empty-repo")
self.run_cmd(*params)
def remove(self, path: str) -> None:
"""
Run "eden remove <path>"
"""
self.run_cmd("remove", "--yes", path)
def in_proc_mounts(self, mount_path: str) -> bool:
"""Gets all eden mounts found in /proc/mounts, and returns
true if this eden instance exists in list.
"""
mount_path_bytes = mount_path.encode()
with open("/proc/mounts", "rb") as f:
return any(
mount_path_bytes == line.split(b" ")[1]
for line in f.readlines()
if util.is_edenfs_mount_device(line.split(b" ")[0])
)
def is_healthy(self) -> bool:
"""Executes `eden health` and returns True if it exited with code 0."""
cmd_result = self.run_unchecked("health")
return cmd_result.returncode == 0
def set_log_level(self, category: str, level: str) -> None:
with self.get_thrift_client() as client:
client.setOption("logging", f"{category}={level}")
def client_dir_for_mount(self, mount_path: pathlib.Path) -> pathlib.Path:
client_link = mount_path / ".eden" / "client"
return pathlib.Path(os.readlink(str(client_link)))
def overlay_dir_for_mount(self, mount_path: pathlib.Path) -> pathlib.Path:
return self.client_dir_for_mount(mount_path) / "local"
def mount(self, mount_path: pathlib.Path) -> None:
self.run_cmd("mount", "--", str(mount_path))
def unmount(self, mount_path: pathlib.Path) -> None:
self.run_cmd("unmount", "--", str(mount_path))
class EdenCommandError(subprocess.CalledProcessError):
def __init__(self, ex: subprocess.CalledProcessError) -> None:
super().__init__(ex.returncode, ex.cmd, output=ex.output, stderr=ex.stderr)
def __str__(self) -> str:
cmd_str = " ".join(shlex.quote(arg) for arg in self.cmd)
return "edenfsctl command returned non-zero exit status %d\n\nCommand:\n[%s]\n\nStderr:\n%s" % (
self.returncode,
cmd_str,
self.stderr,
)
_can_run_eden: Optional[bool] = None
_can_run_fake_edenfs: Optional[bool] = None
_can_run_sudo: Optional[bool] = None
def can_run_eden() -> bool:
"""
Determine if we can run eden.
This is used to determine if we should even attempt running the
integration tests.
"""
global _can_run_eden
can_run = _can_run_eden
if can_run is None:
can_run = _compute_can_run_eden()
_can_run_eden = can_run
return can_run
def can_run_fake_edenfs() -> bool:
"""
Determine if we can run the fake_edenfs helper program.
This is similar to can_run_eden(), but does not require FUSE.
"""
global _can_run_fake_edenfs
can_run = _can_run_fake_edenfs
if can_run is None:
can_run = _compute_can_run_eden(require_fuse=False)
_can_run_fake_edenfs = can_run
return can_run
def _compute_can_run_eden(require_fuse: bool = True) -> bool:
if "SANDCASTLE" in os.environ and sys.platform != "darwin":
# On Sandcastle, pretend that we can always run EdenFS, this prevents
# blindspots where tests are suddenly skipped but still marked as
# passed.
# TODO(T100403433): The tests aren't compatible with macOS right now
# due to Sandcastle not running the job as root.
return True
if sys.platform == "win32":
# On Windows ProjectedFS must be installed.
# Our CMake configure step checks for the availability of ProjectedFSLib.lib
# so that we can link against ProjectedFS at build time, but this doesn't
# guarantee that ProjectedFS.dll is available.
projfs_dll = r"C:\Windows\system32\ProjectedFSLib.dll"
return os.path.exists(projfs_dll)
# FUSE must be available
if require_fuse and not os.path.exists("/dev/fuse"):
return False
# We must be able to start eden as root.
if os.geteuid() == 0:
return True
# The daemon must either be setuid root, or we must have sudo priviliges.
# Typically for the tests the daemon process is not setuid root,
# so check if we have are able to run things under sudo.
return can_run_sudo()
def can_run_sudo() -> bool:
global _can_run_sudo
can_run = _can_run_sudo
if can_run is None:
can_run = _compute_can_run_sudo()
_can_run_sudo = can_run
return can_run
def _compute_can_run_sudo() -> bool:
if sys.platform == "win32":
return False
cmd = ["/usr/bin/sudo", "-E", "/bin/true"]
with open("/dev/null", "r") as dev_null:
# Close stdout, stderr, and stdin, and call setsid() to make
# sure we are detached from any controlling terminal. This makes
# sure that sudo can't prompt for a password if it needs one.
# sudo will only succeed if it can run with no user input.
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=dev_null,
preexec_fn=os.setsid,
)
process.communicate()
return process.returncode == 0