add --remote flag

This commit is contained in:
Jörg Thalheim 2023-09-16 22:22:58 +02:00
parent ae50c356c2
commit 1f5042aa62
5 changed files with 419 additions and 191 deletions

1
.gitignore vendored
View File

@ -1,4 +1,5 @@
result
result-*
# source: https://raw.githubusercontent.com/github/gitignore/main/Python.gitignore
# Byte-compiled / optimized / DLL files

View File

@ -1,6 +1,7 @@
#!/usr/bin/env python3
import os
import sys
import asyncio
sys.path.insert(
0, os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
@ -9,5 +10,5 @@ sys.path.insert(
from nix_ci_build import main # NOQA
if __name__ == "__main__":
main()
asyncio.run(main())

View File

@ -1,13 +1,13 @@
{ python3, makeWrapper, nix, nix-eval-jobs, nix-output-monitor, lib }:
{ python311, makeWrapper, nix, nix-eval-jobs, nix-output-monitor, lib, bashInteractive }:
let
path = lib.makeBinPath [ nix nix-eval-jobs nix-output-monitor ];
in
python3.pkgs.buildPythonApplication {
python311.pkgs.buildPythonApplication {
pname = "nix-ci-build";
version = "0.1.0";
format = "pyproject";
src = ./.;
buildInputs = with python3.pkgs; [ setuptools ];
buildInputs = with python311.pkgs; [ setuptools bashInteractive ];
nativeBuildInputs = [ makeWrapper ];
preFixup = ''
makeWrapperArgs+=(--prefix PATH : ${path})

View File

@ -1,15 +1,20 @@
import argparse
import asyncio
import json
import multiprocessing
import os
import select
import shlex
import shutil
import subprocess
import sys
import time
from contextlib import ExitStack, contextmanager
from abc import ABC
from asyncio import Queue, TaskGroup
from asyncio.subprocess import Process
from collections import defaultdict
from contextlib import AsyncExitStack, asynccontextmanager
from dataclasses import dataclass, field
from tempfile import TemporaryDirectory
from typing import IO, Any, Iterator, NoReturn
from typing import IO, Any, AsyncIterator, Coroutine, DefaultDict, NoReturn
def die(msg: str) -> NoReturn:
@ -17,10 +22,26 @@ def die(msg: str) -> NoReturn:
sys.exit(1)
class Pipe:
def __init__(self) -> None:
fds = os.pipe()
self.read_file = os.fdopen(fds[0], "rb")
self.write_file = os.fdopen(fds[1], "wb")
def __enter__(self) -> "Pipe":
return self
def __exit__(self, _exc_type: Any, _exc_value: Any, _traceback: Any) -> None:
self.read_file.close()
self.write_file.close()
@dataclass
class Options:
flake: str = ""
flake_url: str = ""
flake_fragment: str = ""
options: list[str] = field(default_factory=list)
remote: str | None = None
systems: set[str] = field(default_factory=set)
eval_max_memory_size: int = 4096
skip_cached: bool = False
@ -29,32 +50,48 @@ class Options:
retries: int = 0
verbose: bool = False
copy_to: str | None = None
nom: bool = False
@property
def remote_url(self) -> None | str:
if self.remote is None:
return None
return f"ssh://{self.remote}"
def run_nix(args: list[str]) -> subprocess.CompletedProcess[str]:
async def run_nix(args: list[str]) -> Process:
try:
proc = subprocess.run(["nix"] + args, text=True, capture_output=True)
proc = await asyncio.create_subprocess_exec(
"nix", *args, stdout=asyncio.subprocess.PIPE
)
except FileNotFoundError:
die("nix not found in PATH")
die(f"nix not found in PATH, try to run {shlex.join(args)}")
return proc
def current_system() -> str:
proc = run_nix(["eval", "--impure", "--raw", "--expr", "builtins.currentSystem"])
if proc.returncode != 0:
die(f"Failed to determine current system: {proc.stderr}")
return proc.stdout.strip()
async def get_nix_config() -> dict[str, str]:
proc = await run_nix(["show-config"])
assert proc.stdout is not None
config = {}
async for line in proc.stdout:
cols = line.split(b" = ", 1)
if len(cols) != 2:
continue
key, value = cols
config[key.decode()] = value.decode().strip()
try:
returncode = await proc.wait()
if returncode != 0:
die(f"Failed to get nix config: {returncode}")
finally:
if proc.returncode is None:
proc.kill()
return config
def max_jobs() -> int:
proc = run_nix(["show-config", "max-jobs"])
if proc.returncode != 0:
die(f"Failed to determine number of CPUs: {proc.stderr}")
return int(proc.stdout.strip())
def parse_args(args: list[str]) -> Options:
def parse_args(args: list[str], nix_config: dict[str, str]) -> Options:
parser = argparse.ArgumentParser()
parser.add_argument(
"-f",
"--flake",
@ -65,7 +102,7 @@ def parse_args(args: list[str]) -> Options:
"-j",
"--max-jobs",
type=int,
default=max_jobs(),
default=nix_config.get("max-jobs", 0),
help="Maximum number of build jobs to run in parallel (0 for unlimited)",
)
parser.add_argument(
@ -76,10 +113,19 @@ def parse_args(args: list[str]) -> Options:
metavar=("name", "value"),
default=[],
)
parser.add_argument(
"--no-nom",
help="Use nix-output-monitor to print build output (default: false)",
action="store_true",
default=shutil.which("nom") is None,
)
system = nix_config.get("system")
if system is None:
die("Failed to determine system from nix config")
parser.add_argument(
"--systems",
help="Comma-separated list of systems to build for (default: current system)",
default=current_system(),
default=system,
)
parser.add_argument(
"--retries",
@ -87,6 +133,11 @@ def parse_args(args: list[str]) -> Options:
default=0,
help="Number of times to retry failed builds",
)
parser.add_argument(
"--remote",
type=str,
help="Remote machine to build on",
)
parser.add_argument(
"--skip-cached",
help="Skip builds that are already present in the binary cache (default: false)",
@ -117,14 +168,23 @@ def parse_args(args: list[str]) -> Options:
a = parser.parse_args(args)
systems = set(a.systems.split(","))
flake_parts = a.flake.split("#")
flake_url = flake_parts[0]
flake_fragment = ""
if len(flake_parts) == 2:
flake_fragment = flake_parts[1]
options = []
for name, value in a.option:
options.extend(["--option", name, value])
return Options(
flake=a.flake,
flake_url=flake_url,
flake_fragment=flake_fragment,
remote=a.remote,
skip_cached=a.skip_cached,
options=options,
max_jobs=a.max_jobs,
nom=not a.no_nom,
verbose=a.verbose,
systems=systems,
eval_max_memory_size=a.eval_max_memory_size,
@ -133,51 +193,108 @@ def parse_args(args: list[str]) -> Options:
)
@contextmanager
def nix_eval_jobs(opts: Options) -> Iterator[subprocess.Popen[str]]:
def upload_sources(remote_url: str, flake_url: str) -> str:
cmd = [
"nix",
"flake",
"archive",
"--to",
remote_url,
"--json",
flake_url,
]
print("$ " + shlex.join(cmd))
proc = subprocess.run(cmd, stdout=subprocess.PIPE)
if proc.returncode != 0:
die(
f"failed to upload sources: {shlex.join(cmd)} failed with {proc.returncode}"
)
try:
return json.loads(proc.stdout)["path"]
except Exception as e:
die(
f"failed to parse output of {shlex.join(cmd)}: {e}\nGot: {proc.stdout.decode('utf-8', 'replace')}"
)
def maybe_remote(cmd: list[str], opts: Options) -> list[str]:
if opts.remote:
return ["ssh", opts.remote, "--", shlex.join(cmd)]
else:
return cmd
def nix_shell(packages: list[str]) -> list[str]:
return (
[
"nix",
"shell",
"--extra-experimental-features",
"nix-command",
"--extra-experimental-features",
"flakes",
]
+ packages
+ ["-c"]
)
@asynccontextmanager
async def ensure_stop(
proc: Process, cmd: list[str], timeout: float = 3.0
) -> AsyncIterator[Process]:
try:
yield proc
finally:
if proc.returncode is not None:
return
proc.terminate()
try:
await asyncio.wait_for(proc.wait(), timeout=timeout)
except asyncio.TimeoutError:
print(f"Failed to stop process {shlex.join(cmd)}. Killing it.")
proc.kill()
await proc.wait()
@asynccontextmanager
async def nix_eval_jobs(opts: Options) -> AsyncIterator[Process]:
with TemporaryDirectory() as d:
temp = d
if opts.remote:
# TODO: This is bad
temp = "/tmp/gc-roots"
args = [
"nix-eval-jobs",
"--gc-roots-dir",
d,
temp,
"--force-recurse",
"--max-memory-size",
str(opts.eval_max_memory_size),
"--workers",
str(opts.eval_workers),
"--flake",
opts.flake,
f"{opts.flake_url}#{opts.flake_fragment}",
] + opts.options
if opts.skip_cached:
args.append("--check-cache-status")
print("$ " + " ".join(args))
with subprocess.Popen(args, text=True, stdout=subprocess.PIPE) as proc:
try:
yield proc
finally:
proc.kill()
@contextmanager
def nix_build(
installable: str, stdout: IO[Any] | None, opts: Options
) -> Iterator[subprocess.Popen]:
log_format = "raw"
args = [
"nix",
"build",
installable,
"--log-format",
log_format,
"--keep-going",
] + opts.options
if opts.verbose:
print("$ " + " ".join(args))
with subprocess.Popen(args, text=True, stderr=stdout) as proc:
try:
if opts.remote:
args = nix_shell(["nixpkgs#nix-eval-jobs"]) + args
args = maybe_remote(args, opts)
print("$ " + shlex.join(args))
proc = await asyncio.create_subprocess_exec(*args, stdout=subprocess.PIPE)
async with ensure_stop(proc, args) as proc:
yield proc
finally:
proc.kill()
@asynccontextmanager
async def nix_output_monitor(fd: int, opts: Options) -> AsyncIterator[Process]:
cmd = maybe_remote(nix_shell(["nixpkgs#nix-output-monitor"]) + ["nom"], opts)
proc = await asyncio.create_subprocess_exec(*cmd, stdin=fd)
async with ensure_stop(proc, cmd) as proc:
yield proc
@dataclass
@ -185,133 +302,127 @@ class Build:
attr: str
drv_path: str
outputs: dict[str, str]
proc: subprocess.Popen[str]
retries: int
rc: int | None = None
async def build(
self, stack: AsyncExitStack, build_output: IO[str], opts: Options
) -> int:
proc = await stack.enter_async_context(
nix_build(self.drv_path + "^*", build_output, opts)
)
rc = 0
for _ in range(opts.retries + 1):
rc = await proc.wait()
if rc == 0:
if opts.verbose:
print(f"build {self.attr} succeeded")
return rc
print(f"build {self.attr} exited with {rc}", file=sys.stderr)
return rc
async def nix_copy(
self, args: list[str], exit_stack: AsyncExitStack, opts: Options
) -> int:
cmd = maybe_remote(["nix", "copy", "--log-format", "raw"] + args, opts)
if opts.verbose:
print("$ " + shlex.join(cmd))
proc = await asyncio.create_subprocess_exec(*cmd)
await exit_stack.enter_async_context(ensure_stop(proc, cmd))
return await proc.wait()
async def upload(self, exit_stack: AsyncExitStack, opts: Options) -> int:
if not opts.copy_to:
return 0
cmd = ["nix", "copy", "--log-format", "raw", "--to", opts.copy_to] + list(
self.outputs.values()
)
cmd = maybe_remote(cmd, opts)
if opts.verbose:
print("$ " + shlex.join(cmd))
proc = await asyncio.create_subprocess_exec(*cmd)
await exit_stack.enter_async_context(ensure_stop(proc, cmd))
return await proc.wait()
async def download(self, exit_stack: AsyncExitStack, opts: Options) -> int:
if not opts.remote_url:
return 0
cmd = [
"nix",
"copy",
"--log-format",
"raw",
"--no-check-sigs",
"--from",
opts.remote_url,
] + list(self.outputs.values())
if opts.verbose:
print("$ " + shlex.join(cmd))
proc = await asyncio.create_subprocess_exec(*cmd)
await exit_stack.enter_async_context(ensure_stop(proc, cmd))
return await proc.wait()
@dataclass
class EvalError:
class Failure(ABC):
attr: str
error: str
error_message: str
def wait_for_any_build(builds: list[Build]) -> Build:
while True:
for i, build in enumerate(builds):
rc = build.proc.poll()
if rc is not None:
del builds[i]
build.rc = rc
return build
time.sleep(0.05)
class EvalFailure(Failure):
pass
def drain_builds(
builds: list[Build], stdout: IO[Any] | None, stack: ExitStack, opts: Options
) -> list[Build]:
build = wait_for_any_build(builds)
if build.rc != 0:
print(f"build {build.attr} exited with {build.rc}", file=sys.stderr)
if build.retries < opts.retries:
print(f"retrying build {build.attr} [{build.retries + 1}/{opts.retries}]")
builds.append(
create_build(
build.attr,
build.drv_path,
build.outputs,
stdout,
stack,
opts,
build.retries + 1,
)
)
else:
return [build]
return []
class BuildFailure(Failure):
pass
def create_build(
attr: str,
drv_path: str,
outputs: dict[str, str],
stdout: IO[Any] | None,
exit_stack: ExitStack,
opts: Options,
retries: int = 0,
) -> Build:
nix_build_proc = exit_stack.enter_context(nix_build(drv_path + "^*", stdout, opts))
if opts.copy_to:
if opts.verbose:
print(f"copying {attr} to {opts.copy_to}")
exit_stack.enter_context(
subprocess.Popen(
[
"nix",
"copy",
"--to",
opts.copy_to,
]
+ list(outputs.values()),
)
)
return Build(attr, drv_path, outputs, nix_build_proc, retries=retries)
class UploadFailure(Failure):
pass
class Pipe:
def __init__(self) -> None:
fds = os.pipe()
self.read_file = os.fdopen(fds[0], "rb")
self.write_file = os.fdopen(fds[1], "wb")
def __enter__(self) -> "Pipe":
return self
def __exit__(self, _exc_type: Any, _exc_value: Any, _traceback: Any) -> None:
self.read_file.close()
self.write_file.close()
class DownloadFailure(Failure):
pass
def stop_gracefully(proc: subprocess.Popen, timeout: int = 1) -> None:
proc.terminate()
try:
proc.wait(timeout=timeout)
except subprocess.TimeoutExpired:
proc.kill()
@contextmanager
def nix_output_monitor(fd: int) -> Iterator[subprocess.Popen]:
proc = subprocess.Popen(["nom"], stdin=fd)
@asynccontextmanager
async def nix_build(
installable: str, stderr: IO[Any] | None, opts: Options
) -> AsyncIterator[Process]:
args = [
"nix",
"build",
installable,
"--log-format",
"raw",
"--keep-going",
] + opts.options
args = maybe_remote(args, opts)
if opts.verbose:
print("$ " + shlex.join(args))
proc = await asyncio.create_subprocess_exec(*args, stderr=stderr)
try:
yield proc
finally:
stop_gracefully(proc)
proc.kill()
def run_builds(stack: ExitStack, opts: Options) -> int:
eval_error = []
build_failures = []
drv_paths = set()
proc = stack.enter_context(nix_eval_jobs(opts))
assert proc.stdout
pipe = stack.enter_context(Pipe())
nom_proc: subprocess.Popen | None = None
stdout = pipe.write_file
builds: list[Build] = []
for line in proc.stdout:
async def run_evaluation(
eval_proc: Process,
build_queue: Queue[tuple[str, str, str]],
failures: list[Failure],
opts: Options,
) -> None:
assert eval_proc.stdout
async for line in eval_proc.stdout:
if opts.verbose:
print(line, end="")
if nom_proc is None:
nom_proc = stack.enter_context(nix_output_monitor(pipe.read_file.fileno()))
try:
job = json.loads(line)
except json.JSONDecodeError:
die(f"Failed to parse line of nix-eval-jobs output: {line}")
die(f"Failed to parse line of nix-eval-jobs output: {line.decode()}")
error = job.get("error")
attr = job.get("attr", "unknown-flake-attribute")
if error:
eval_error.append(EvalError(attr, error))
failures.append(EvalFailure(attr, error))
continue
is_cached = job.get("isCached", False)
if is_cached:
@ -321,48 +432,164 @@ def run_builds(stack: ExitStack, opts: Options) -> int:
continue
drv_path = job.get("drvPath")
if not drv_path:
die(f"nix-eval-jobs did not return a drvPath: {line}")
while len(builds) >= opts.max_jobs and opts.max_jobs != 0:
build_failures += drain_builds(builds, stdout, stack, opts)
die(f"nix-eval-jobs did not return a drvPath: {line.decode()}")
outputs = job.get("outputs", {})
build_queue.put_nowait((attr, drv_path, outputs))
async def run_builds(
stack: AsyncExitStack,
build_output: IO,
build_queue: Queue,
upload_queue: Queue,
download_queue: Queue,
failures: list[Failure],
opts: Options,
) -> NoReturn:
drv_paths: set[Any] = set()
while True:
attr, drv_path, outputs = await build_queue.get()
print(f" building {attr}")
if drv_path in drv_paths:
continue
drv_paths.add(drv_path)
outputs = job.get("outputs", {})
builds.append(create_build(attr, drv_path, outputs, stdout, stack, opts))
build = Build(attr, drv_path, outputs)
rc = await build.build(stack, build_output, opts)
build_queue.task_done()
if rc == 0:
upload_queue.put_nowait(build)
download_queue.put_nowait(build)
else:
failures.append(BuildFailure(build.attr, f"build exited with {rc}"))
while builds:
build_failures += drain_builds(builds, stdout, stack, opts)
if nom_proc is not None:
stop_gracefully(nom_proc)
async def run_uploads(
stack: AsyncExitStack,
upload_queue: Queue[Build],
failures: list[Failure],
opts: Options,
) -> NoReturn:
while True:
build = await upload_queue.get()
rc = await build.upload(stack, opts)
if rc != 0:
failures.append(UploadFailure(build.attr, f"upload exited with {rc}"))
upload_queue.task_done()
eval_rc = proc.wait()
if eval_rc != 0:
print(
f"nix-eval-jobs exited with {eval_rc}, check logs for details",
file=sys.stderr,
async def run_downloads(
stack: AsyncExitStack,
download_queue: Queue[Build],
failures: list[Failure],
opts: Options,
) -> NoReturn:
while True:
build = await download_queue.get()
rc = await build.download(stack, opts)
if rc != 0:
failures.append(DownloadFailure(build.attr, f"download exited with {rc}"))
download_queue.task_done()
async def report_progress(
build_queue: Queue,
upload_queue: Queue,
download_queue: Queue,
) -> NoReturn:
old_status = ""
while True:
new_status = f"builds: {build_queue.qsize()}, uploads: {upload_queue.qsize()}, downloads: {download_queue.qsize()}"
if new_status != old_status:
print(new_status)
old_status = new_status
await asyncio.sleep(0.5)
async def run(stack: AsyncExitStack, opts: Options) -> int:
eval_proc_future = stack.enter_async_context(nix_eval_jobs(opts))
pipe: Pipe | None = None
output_monitor_future: Coroutine[None, None, Process] | None = None
if opts.nom:
pipe = stack.enter_context(Pipe())
output_monitor_future = stack.enter_async_context(
nix_output_monitor(pipe.read_file.fileno(), opts)
)
eval_proc = await eval_proc_future
output_monitor: Process | None = None
if output_monitor_future:
output_monitor = await output_monitor_future
failures: DefaultDict[type, list[Failure]] = defaultdict(list)
build_queue: Queue[tuple[str, str, str]] = Queue()
upload_queue: Queue[Build] = Queue()
download_queue: Queue[Build] = Queue()
for error in eval_error:
print(f"{error.attr}: {error.error}", file=sys.stderr)
evaluation = run_evaluation(eval_proc, build_queue, failures[EvalFailure], opts)
async with TaskGroup() as tg:
build_output = sys.stdout.buffer
if pipe:
build_output = pipe.write_file
tasks = []
for _ in range(opts.max_jobs):
tasks.append(
tg.create_task(
run_builds(
stack,
build_output,
build_queue,
upload_queue,
download_queue,
failures[BuildFailure],
opts,
)
)
)
tasks.append(
tg.create_task(
run_uploads(stack, upload_queue, failures[UploadFailure], opts)
)
)
tasks.append(
tg.create_task(
run_downloads(stack, download_queue, failures[DownloadFailure], opts)
)
)
if not opts.nom:
tasks.append(
tg.create_task(
report_progress(build_queue, upload_queue, download_queue)
)
)
await evaluation
await build_queue.join()
await upload_queue.join()
await download_queue.join()
for task in tasks:
task.cancel()
for build in build_failures:
print(f"{build.attr}: build failed with {build.rc}", file=sys.stderr)
for failure_type in [EvalFailure, BuildFailure, UploadFailure, DownloadFailure]:
for failure in failures[failure_type]:
print(
f"{failure_type.__name__} for {failure.attr}: {failure.error_message}"
)
if eval_proc.returncode != 0 and eval_proc.returncode is not None:
print(f"nix-eval-jobs exited with {eval_proc.returncode}")
if (
output_monitor
and output_monitor.returncode != 0
and output_monitor.returncode is not None
):
print(f"nix-output-monitor exited with {output_monitor.returncode}")
if len(build_failures) > 0 or len(eval_error) > 0 or eval_rc != 0:
return 1
else:
return 0
return 0
def main() -> None:
opts = parse_args(sys.argv[1:])
async def main() -> None:
nix_config = await get_nix_config()
opts = parse_args(sys.argv[1:], nix_config)
rc = 0
with ExitStack() as stack:
rc = run_builds(stack, opts)
async with AsyncExitStack() as stack:
if opts.remote_url:
opts.flake_url = upload_sources(opts.remote_url, opts.flake_url)
rc = await run(stack, opts)
sys.exit(rc)
if __name__ == "__main__":
main()

View File

@ -13,7 +13,6 @@ classifiers = [
"Environment :: Console",
"Topic :: Utilities",
"Intended Audience :: Developers",
"Programming Language :: Python :: 3.6",
]
[project.urls]
@ -58,7 +57,7 @@ exclude = '''
'''
[tool.mypy]
python_version = "3.10"
python_version = "3.11"
warn_redundant_casts = true
disallow_untyped_calls = true
disallow_untyped_defs = true