mirror of
https://github.com/facebook/sapling.git
synced 2025-01-04 03:06:30 +03:00
740d414184
Summary: On Windows, this has the benefit of automatically converting \r\n into \n, which allows more tests to pass. Reviewed By: chadaustin Differential Revision: D22871408 fbshipit-source-id: 02ec1d21dc236175c3b0f3176db9b8c91dee21a4
390 lines
14 KiB
Python
390 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
# Copyright (c) Facebook, Inc. and its affiliates.
|
|
#
|
|
# This software may be used and distributed according to the terms of the
|
|
# GNU General Public License version 2.
|
|
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
from textwrap import dedent
|
|
from typing import Optional, Sequence, Set
|
|
|
|
from eden.integration.lib.hgrepo import HgRepository
|
|
|
|
from .lib import edenclient, testcase
|
|
from .lib.fake_edenfs import get_fake_edenfs_argv
|
|
from .lib.find_executables import FindExe
|
|
from .lib.service_test_case import (
|
|
ServiceTestCaseBase,
|
|
SystemdServiceTest,
|
|
service_test,
|
|
systemd_test,
|
|
)
|
|
|
|
|
|
@testcase.eden_repo_test
|
|
class CloneTest(testcase.EdenRepoTest):
|
|
def populate_repo(self) -> None:
|
|
self.repo.write_file("hello", "hola\n")
|
|
self.repo.commit("Initial commit.")
|
|
|
|
def test_clone_to_non_existent_directory(self) -> None:
|
|
tmp = self.make_temporary_directory()
|
|
non_existent_dir = os.path.join(tmp, "foo/bar/baz")
|
|
|
|
self.eden.run_cmd("clone", self.repo.path, non_existent_dir)
|
|
self.assertTrue(
|
|
os.path.isfile(os.path.join(non_existent_dir, "hello")),
|
|
msg="clone should succeed in non-existent directory",
|
|
)
|
|
|
|
def test_clone_to_dir_under_symlink(self) -> None:
|
|
tmp = self.make_temporary_directory()
|
|
empty_dir = os.path.join(tmp, "foo/bar")
|
|
os.makedirs(empty_dir)
|
|
|
|
symlink_dir = os.path.join(tmp, "food")
|
|
os.symlink(os.path.join(tmp, "foo"), symlink_dir)
|
|
|
|
symlinked_target = os.path.join(symlink_dir, "bar")
|
|
|
|
self.eden.run_cmd("clone", self.repo.path, symlinked_target)
|
|
self.assertTrue(
|
|
os.path.isfile(os.path.join(empty_dir, "hello")),
|
|
msg="clone should succeed in empty directory",
|
|
)
|
|
|
|
with self.get_thrift_client() as client:
|
|
active_mount_points: Set[Optional[str]] = {
|
|
os.fsdecode(mount.mountPoint) for mount in client.listMounts()
|
|
}
|
|
self.assertIn(
|
|
empty_dir, active_mount_points, msg="mounted using the realpath"
|
|
)
|
|
|
|
self.eden.run_cmd("remove", "--yes", symlinked_target)
|
|
|
|
def test_clone_to_existing_empty_directory(self) -> None:
|
|
tmp = self.make_temporary_directory()
|
|
empty_dir = os.path.join(tmp, "foo/bar/baz")
|
|
os.makedirs(empty_dir)
|
|
|
|
self.eden.run_cmd("clone", self.repo.path, empty_dir)
|
|
self.assertTrue(
|
|
os.path.isfile(os.path.join(empty_dir, "hello")),
|
|
msg="clone should succeed in empty directory",
|
|
)
|
|
|
|
def test_clone_from_repo(self) -> None:
|
|
# Specify the source of the clone as an existing local repo rather than
|
|
# an alias for a config.
|
|
destination_dir = self.make_temporary_directory()
|
|
self.eden.run_cmd("clone", self.repo.path, destination_dir)
|
|
self.assertTrue(
|
|
os.path.isfile(os.path.join(destination_dir, "hello")),
|
|
msg="clone should succeed in empty directory",
|
|
)
|
|
|
|
def test_clone_with_arcconfig(self) -> None:
|
|
project_id = "special_project"
|
|
|
|
# Add an .arcconfig file to the repository
|
|
arcconfig_data = {
|
|
"project_id": project_id,
|
|
"conduit_uri": "https://phabricator.example.com/api/",
|
|
"phutil_libraries": {"foo": "foo/arcanist/"},
|
|
}
|
|
self.repo.write_file(".arcconfig", json.dumps(arcconfig_data) + "\n")
|
|
self.repo.commit("Add .arcconfig")
|
|
|
|
# Add a config alias for a repo with some bind mounts.
|
|
edenrc = os.path.join(self.home_dir, ".edenrc")
|
|
with open(edenrc, "w") as f:
|
|
f.write(
|
|
dedent(
|
|
f"""\
|
|
["repository {project_id}"]
|
|
path = "{self.repo.get_canonical_root()}"
|
|
type = "{self.repo.get_type()}"
|
|
|
|
["bindmounts {project_id}"]
|
|
mnt1 = "foo/stuff/build_output"
|
|
mnt2 = "node_modules"
|
|
"""
|
|
)
|
|
)
|
|
|
|
# Clone the repository using its path.
|
|
# We should find the config from the project_id field in
|
|
# the .arcconfig file
|
|
eden_clone = self.make_temporary_directory()
|
|
self.eden.run_cmd("clone", self.repo.path, eden_clone)
|
|
self.assertFalse(
|
|
os.path.isdir(os.path.join(eden_clone, "foo/stuff/build_output")),
|
|
msg="clone should not create bind mounts from legacy config",
|
|
)
|
|
self.assertFalse(
|
|
os.path.isdir(os.path.join(eden_clone, "node_modules")),
|
|
msg="clone should not create bind mounts from legacy config",
|
|
)
|
|
|
|
def test_clone_from_eden_repo(self) -> None:
|
|
# Create an Eden mount from the config alias.
|
|
eden_clone1 = self.make_temporary_directory()
|
|
self.eden.run_cmd("clone", self.repo.path, eden_clone1)
|
|
|
|
self.assertFalse(
|
|
os.path.isdir(os.path.join(eden_clone1, "tmp/bm1")),
|
|
msg="clone should not create bind mount from the legacy config",
|
|
)
|
|
|
|
# Clone the Eden clone! Note it should inherit its config.
|
|
eden_clone2 = self.make_temporary_directory()
|
|
self.eden.run_cmd(
|
|
"clone", "--rev", self.repo.get_head_hash(), eden_clone1, eden_clone2
|
|
)
|
|
|
|
def test_clone_with_valid_revision_cmd_line_arg_works(self) -> None:
|
|
tmp = self.make_temporary_directory()
|
|
target = os.path.join(tmp, "foo/bar/baz")
|
|
self.eden.run_cmd(
|
|
"clone", "--rev", self.repo.get_head_hash(), self.repo.path, target
|
|
)
|
|
self.assertTrue(
|
|
os.path.isfile(os.path.join(target, "hello")),
|
|
msg="clone should succeed with --snapshop arg.",
|
|
)
|
|
|
|
def test_clone_with_short_revision_cmd_line_arg_works(self) -> None:
|
|
tmp = self.make_temporary_directory()
|
|
target = os.path.join(tmp, "foo/bar/baz")
|
|
short = self.repo.get_head_hash()[:6]
|
|
self.eden.run_cmd("clone", "--rev", short, self.repo.path, target)
|
|
self.assertTrue(
|
|
os.path.isfile(os.path.join(target, "hello")),
|
|
msg="clone should succeed with short --snapshop arg.",
|
|
)
|
|
|
|
def test_clone_to_non_empty_directory_fails(self) -> None:
|
|
tmp = self.make_temporary_directory()
|
|
non_empty_dir = os.path.join(tmp, "foo/bar/baz")
|
|
os.makedirs(non_empty_dir)
|
|
with open(os.path.join(non_empty_dir, "example.txt"), "w") as f:
|
|
f.write("I am not empty.\n")
|
|
|
|
with self.assertRaises(subprocess.CalledProcessError) as context:
|
|
self.eden.run_cmd("clone", self.repo.path, non_empty_dir)
|
|
stderr = context.exception.stderr
|
|
self.assertRegex(
|
|
stderr,
|
|
"destination path .* is not empty",
|
|
msg="clone into non-empty dir should fail",
|
|
)
|
|
|
|
def test_clone_with_invalid_revision_cmd_line_arg_fails(self) -> None:
|
|
tmp = self.make_temporary_directory()
|
|
empty_dir = os.path.join(tmp, "foo/bar/baz")
|
|
os.makedirs(empty_dir)
|
|
|
|
with self.assertRaises(subprocess.CalledProcessError) as context:
|
|
self.eden.run_cmd("clone", self.repo.path, empty_dir, "--rev", "X")
|
|
stderr = context.exception.stderr
|
|
self.assertIn(
|
|
"unable to find hash for commit 'X': ",
|
|
stderr,
|
|
msg="passing invalid commit on cmd line should fail",
|
|
)
|
|
|
|
def test_clone_to_file_fails(self) -> None:
|
|
tmp = self.make_temporary_directory()
|
|
non_empty_dir = os.path.join(tmp, "foo/bar/baz")
|
|
os.makedirs(non_empty_dir)
|
|
file_in_directory = os.path.join(non_empty_dir, "example.txt")
|
|
with open(file_in_directory, "w") as f:
|
|
f.write("I am not empty.\n")
|
|
|
|
with self.assertRaises(subprocess.CalledProcessError) as context:
|
|
self.eden.run_cmd("clone", self.repo.path, file_in_directory)
|
|
stderr = context.exception.stderr
|
|
self.assertIn(
|
|
f"error: destination path {file_in_directory} is not a directory\n", stderr
|
|
)
|
|
|
|
def test_clone_to_non_existent_directory_that_is_under_a_file_fails(self) -> None:
|
|
tmp = self.make_temporary_directory()
|
|
non_existent_dir = os.path.join(tmp, "foo/bar/baz")
|
|
with open(os.path.join(tmp, "foo"), "w") as f:
|
|
f.write("I am not empty.\n")
|
|
|
|
with self.assertRaises(subprocess.CalledProcessError) as context:
|
|
self.eden.run_cmd("clone", self.repo.path, non_existent_dir)
|
|
stderr = context.exception.stderr
|
|
self.assertIn(
|
|
f"error: destination path {non_existent_dir} is not a directory\n", stderr
|
|
)
|
|
|
|
def test_attempt_clone_invalid_repo_path(self) -> None:
|
|
tmp = self.make_temporary_directory()
|
|
repo_path = "/this/directory/does/not/exist"
|
|
|
|
with self.assertRaises(edenclient.EdenCommandError) as context:
|
|
self.eden.run_cmd("clone", repo_path, tmp)
|
|
self.assertIn(
|
|
f"error: {repo_path!r} does not look like a valid hg or git repository\n",
|
|
context.exception.stderr,
|
|
)
|
|
|
|
def test_clone_should_start_daemon(self) -> None:
|
|
# Shut down Eden.
|
|
self.assertTrue(self.eden.is_healthy())
|
|
self.eden.shutdown()
|
|
self.assertFalse(self.eden.is_healthy())
|
|
|
|
# Check `eden list`.
|
|
list_output = self.eden.list_cmd_simple()
|
|
self.assertEqual(
|
|
{self.mount: "NOT_RUNNING"}, list_output, msg="Eden should have one mount."
|
|
)
|
|
|
|
extra_daemon_args = self.eden.get_extra_daemon_args()
|
|
|
|
# Verify that clone starts the daemon.
|
|
tmp = Path(self.make_temporary_directory())
|
|
clone_output = self.eden.run_cmd(
|
|
"clone",
|
|
"--daemon-binary",
|
|
FindExe.EDEN_DAEMON,
|
|
self.repo.path,
|
|
str(tmp),
|
|
"--daemon-args",
|
|
*extra_daemon_args,
|
|
)
|
|
self.exit_stack.callback(self.eden.run_cmd, "stop")
|
|
self.assertIn("Starting edenfs", clone_output)
|
|
self.assertTrue(self.eden.is_healthy(), msg="clone should start Eden.")
|
|
mount_points = {self.mount: "RUNNING", str(tmp): "RUNNING"}
|
|
self.assertEqual(
|
|
mount_points,
|
|
self.eden.list_cmd_simple(),
|
|
msg="Eden should have two mounts.",
|
|
)
|
|
self.assertEqual("hola\n", (tmp / "hello").read_text())
|
|
|
|
def test_custom_not_mounted_readme(self) -> None:
|
|
"""Test that "eden clone" creates a README file in the mount point directory
|
|
telling users what to do if their checkout is not currently mounted.
|
|
"""
|
|
# Write a custom readme file in our etc config directory
|
|
custom_readme_text = "If this is broken bug joe@example.com\n"
|
|
with open(os.path.join(self.etc_eden_dir, "NOT_MOUNTED_README.txt"), "w") as f:
|
|
f.write(custom_readme_text)
|
|
|
|
# Perform a clone
|
|
new_mount = Path(self.make_temporary_directory())
|
|
readme_path = new_mount / "README_EDEN.txt"
|
|
self.eden.run_cmd("clone", self.repo.path, str(new_mount))
|
|
self.assertEqual("hola\n", (new_mount / "hello").read_text())
|
|
self.assertFalse(os.path.exists(readme_path))
|
|
|
|
# Now unmount the checkout and make sure we see the readme
|
|
self.eden.run_cmd("unmount", str(new_mount))
|
|
self.assertFalse((new_mount / "hello").exists())
|
|
self.assertEqual(custom_readme_text, readme_path.read_text())
|
|
|
|
|
|
class CloneFakeEdenFSTestBase(ServiceTestCaseBase):
|
|
def setUp(self) -> None:
|
|
super().setUp()
|
|
self.eden_dir = Path(self.make_temporary_directory())
|
|
|
|
def make_dummy_hg_repo(self) -> HgRepository:
|
|
repo = HgRepository(path=self.make_temporary_directory())
|
|
repo.init()
|
|
repo.write_file("hello", "")
|
|
repo.commit("Initial commit.")
|
|
return repo
|
|
|
|
def spawn_clone(
|
|
self,
|
|
repo_path: Path,
|
|
mount_path: Path,
|
|
extra_args: Optional[Sequence[str]] = None,
|
|
) -> subprocess.CompletedProcess:
|
|
eden_cli: str = FindExe.EDEN_CLI
|
|
fake_edenfs: str = FindExe.FAKE_EDENFS
|
|
base_args = [
|
|
eden_cli,
|
|
"--config-dir",
|
|
str(self.eden_dir),
|
|
] + self.get_required_eden_cli_args()
|
|
clone_cmd = base_args + [
|
|
"clone",
|
|
"--daemon-binary",
|
|
fake_edenfs,
|
|
str(repo_path),
|
|
str(mount_path),
|
|
]
|
|
if extra_args:
|
|
clone_cmd.extend(extra_args)
|
|
|
|
proc = subprocess.run(
|
|
clone_cmd,
|
|
encoding="utf-8",
|
|
errors="replace",
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
|
|
stop_cmd = base_args + ["stop"]
|
|
self.exit_stack.callback(subprocess.call, stop_cmd)
|
|
|
|
# Pass through any output from the clone command
|
|
sys.stdout.write(proc.stdout)
|
|
sys.stderr.write(proc.stderr)
|
|
|
|
# Note that the clone operation will actually fail here, since we started
|
|
# fake_edenfs rather than the real edenfs daemon, and it can't mount checkouts.
|
|
# However we only care about testing that the daemon got started, so that's
|
|
# fine.
|
|
return proc
|
|
|
|
|
|
@service_test
|
|
class CloneFakeEdenFSTest(CloneFakeEdenFSTestBase):
|
|
def test_daemon_command_arguments_should_forward_to_edenfs(self) -> None:
|
|
repo = self.make_dummy_hg_repo()
|
|
mount_path = Path(self.make_temporary_directory())
|
|
|
|
extra_daemon_args = ["--allowExtraArgs", "hello world"]
|
|
self.spawn_clone(
|
|
repo_path=Path(repo.path),
|
|
mount_path=mount_path,
|
|
extra_args=["--daemon-args"] + extra_daemon_args,
|
|
)
|
|
|
|
argv = get_fake_edenfs_argv(self.eden_dir)
|
|
self.assertEqual(
|
|
argv[-len(extra_daemon_args) :],
|
|
extra_daemon_args,
|
|
f"fake_edenfs should have received arguments verbatim\nargv: {argv}",
|
|
)
|
|
|
|
|
|
@systemd_test
|
|
class CloneFakeEdenFSWithSystemdTest(SystemdServiceTest, CloneFakeEdenFSTestBase):
|
|
def test_clone_starts_systemd_service(self) -> None:
|
|
repo = self.make_dummy_hg_repo()
|
|
mount_path = Path(self.make_temporary_directory())
|
|
clone_process = self.spawn_clone(
|
|
repo_path=Path(repo.path), mount_path=mount_path
|
|
)
|
|
self.assertIn(
|
|
"edenfs daemon is not currently running. Starting edenfs...",
|
|
clone_process.stdout,
|
|
)
|
|
self.assertIn("Started edenfs", clone_process.stderr)
|
|
self.assert_systemd_service_is_active(eden_dir=self.eden_dir)
|