From 1f5042aa627afa40fed953c7daeaaf31ed531874 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Thalheim?= Date: Sat, 16 Sep 2023 22:22:58 +0200 Subject: [PATCH] add --remote flag --- .gitignore | 1 + bin/nix-ci-build | 3 +- default.nix | 6 +- nix_ci_build/__init__.py | 597 +++++++++++++++++++++++++++------------ pyproject.toml | 3 +- 5 files changed, 419 insertions(+), 191 deletions(-) diff --git a/.gitignore b/.gitignore index c981805..687322e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ result +result-* # source: https://raw.githubusercontent.com/github/gitignore/main/Python.gitignore # Byte-compiled / optimized / DLL files diff --git a/bin/nix-ci-build b/bin/nix-ci-build index 3f0d961..71aba2e 100755 --- a/bin/nix-ci-build +++ b/bin/nix-ci-build @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import os import sys +import asyncio sys.path.insert( 0, os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))) @@ -9,5 +10,5 @@ sys.path.insert( from nix_ci_build import main # NOQA if __name__ == "__main__": - main() + asyncio.run(main()) diff --git a/default.nix b/default.nix index 93083c3..323233b 100644 --- a/default.nix +++ b/default.nix @@ -1,13 +1,13 @@ -{ python3, makeWrapper, nix, nix-eval-jobs, nix-output-monitor, lib }: +{ python311, makeWrapper, nix, nix-eval-jobs, nix-output-monitor, lib, bashInteractive }: let path = lib.makeBinPath [ nix nix-eval-jobs nix-output-monitor ]; in -python3.pkgs.buildPythonApplication { +python311.pkgs.buildPythonApplication { pname = "nix-ci-build"; version = "0.1.0"; format = "pyproject"; src = ./.; - buildInputs = with python3.pkgs; [ setuptools ]; + buildInputs = with python311.pkgs; [ setuptools bashInteractive ]; nativeBuildInputs = [ makeWrapper ]; preFixup = '' makeWrapperArgs+=(--prefix PATH : ${path}) diff --git a/nix_ci_build/__init__.py b/nix_ci_build/__init__.py index a50ab18..2f42865 100644 --- a/nix_ci_build/__init__.py +++ b/nix_ci_build/__init__.py @@ -1,15 +1,20 @@ import argparse +import asyncio import json import multiprocessing import os -import select +import shlex +import shutil import subprocess import sys -import time -from contextlib import ExitStack, contextmanager +from abc import ABC +from asyncio import Queue, TaskGroup +from asyncio.subprocess import Process +from collections import defaultdict +from contextlib import AsyncExitStack, asynccontextmanager from dataclasses import dataclass, field from tempfile import TemporaryDirectory -from typing import IO, Any, Iterator, NoReturn +from typing import IO, Any, AsyncIterator, Coroutine, DefaultDict, NoReturn def die(msg: str) -> NoReturn: @@ -17,10 +22,26 @@ def die(msg: str) -> NoReturn: sys.exit(1) +class Pipe: + def __init__(self) -> None: + fds = os.pipe() + self.read_file = os.fdopen(fds[0], "rb") + self.write_file = os.fdopen(fds[1], "wb") + + def __enter__(self) -> "Pipe": + return self + + def __exit__(self, _exc_type: Any, _exc_value: Any, _traceback: Any) -> None: + self.read_file.close() + self.write_file.close() + + @dataclass class Options: - flake: str = "" + flake_url: str = "" + flake_fragment: str = "" options: list[str] = field(default_factory=list) + remote: str | None = None systems: set[str] = field(default_factory=set) eval_max_memory_size: int = 4096 skip_cached: bool = False @@ -29,32 +50,48 @@ class Options: retries: int = 0 verbose: bool = False copy_to: str | None = None + nom: bool = False + + @property + def remote_url(self) -> None | str: + if self.remote is None: + return None + return f"ssh://{self.remote}" -def run_nix(args: list[str]) -> subprocess.CompletedProcess[str]: +async def run_nix(args: list[str]) -> Process: try: - proc = subprocess.run(["nix"] + args, text=True, capture_output=True) + proc = await asyncio.create_subprocess_exec( + "nix", *args, stdout=asyncio.subprocess.PIPE + ) except FileNotFoundError: - die("nix not found in PATH") + die(f"nix not found in PATH, try to run {shlex.join(args)}") return proc -def current_system() -> str: - proc = run_nix(["eval", "--impure", "--raw", "--expr", "builtins.currentSystem"]) - if proc.returncode != 0: - die(f"Failed to determine current system: {proc.stderr}") - return proc.stdout.strip() +async def get_nix_config() -> dict[str, str]: + proc = await run_nix(["show-config"]) + assert proc.stdout is not None + config = {} + async for line in proc.stdout: + cols = line.split(b" = ", 1) + if len(cols) != 2: + continue + key, value = cols + config[key.decode()] = value.decode().strip() + try: + returncode = await proc.wait() + if returncode != 0: + die(f"Failed to get nix config: {returncode}") + finally: + if proc.returncode is None: + proc.kill() + return config -def max_jobs() -> int: - proc = run_nix(["show-config", "max-jobs"]) - if proc.returncode != 0: - die(f"Failed to determine number of CPUs: {proc.stderr}") - return int(proc.stdout.strip()) - - -def parse_args(args: list[str]) -> Options: +def parse_args(args: list[str], nix_config: dict[str, str]) -> Options: parser = argparse.ArgumentParser() + parser.add_argument( "-f", "--flake", @@ -65,7 +102,7 @@ def parse_args(args: list[str]) -> Options: "-j", "--max-jobs", type=int, - default=max_jobs(), + default=nix_config.get("max-jobs", 0), help="Maximum number of build jobs to run in parallel (0 for unlimited)", ) parser.add_argument( @@ -76,10 +113,19 @@ def parse_args(args: list[str]) -> Options: metavar=("name", "value"), default=[], ) + parser.add_argument( + "--no-nom", + help="Use nix-output-monitor to print build output (default: false)", + action="store_true", + default=shutil.which("nom") is None, + ) + system = nix_config.get("system") + if system is None: + die("Failed to determine system from nix config") parser.add_argument( "--systems", help="Comma-separated list of systems to build for (default: current system)", - default=current_system(), + default=system, ) parser.add_argument( "--retries", @@ -87,6 +133,11 @@ def parse_args(args: list[str]) -> Options: default=0, help="Number of times to retry failed builds", ) + parser.add_argument( + "--remote", + type=str, + help="Remote machine to build on", + ) parser.add_argument( "--skip-cached", help="Skip builds that are already present in the binary cache (default: false)", @@ -117,14 +168,23 @@ def parse_args(args: list[str]) -> Options: a = parser.parse_args(args) systems = set(a.systems.split(",")) + flake_parts = a.flake.split("#") + flake_url = flake_parts[0] + flake_fragment = "" + if len(flake_parts) == 2: + flake_fragment = flake_parts[1] + options = [] for name, value in a.option: options.extend(["--option", name, value]) return Options( - flake=a.flake, + flake_url=flake_url, + flake_fragment=flake_fragment, + remote=a.remote, skip_cached=a.skip_cached, options=options, max_jobs=a.max_jobs, + nom=not a.no_nom, verbose=a.verbose, systems=systems, eval_max_memory_size=a.eval_max_memory_size, @@ -133,51 +193,108 @@ def parse_args(args: list[str]) -> Options: ) -@contextmanager -def nix_eval_jobs(opts: Options) -> Iterator[subprocess.Popen[str]]: +def upload_sources(remote_url: str, flake_url: str) -> str: + cmd = [ + "nix", + "flake", + "archive", + "--to", + remote_url, + "--json", + flake_url, + ] + print("$ " + shlex.join(cmd)) + proc = subprocess.run(cmd, stdout=subprocess.PIPE) + if proc.returncode != 0: + die( + f"failed to upload sources: {shlex.join(cmd)} failed with {proc.returncode}" + ) + try: + return json.loads(proc.stdout)["path"] + except Exception as e: + die( + f"failed to parse output of {shlex.join(cmd)}: {e}\nGot: {proc.stdout.decode('utf-8', 'replace')}" + ) + + +def maybe_remote(cmd: list[str], opts: Options) -> list[str]: + if opts.remote: + return ["ssh", opts.remote, "--", shlex.join(cmd)] + else: + return cmd + + +def nix_shell(packages: list[str]) -> list[str]: + return ( + [ + "nix", + "shell", + "--extra-experimental-features", + "nix-command", + "--extra-experimental-features", + "flakes", + ] + + packages + + ["-c"] + ) + + +@asynccontextmanager +async def ensure_stop( + proc: Process, cmd: list[str], timeout: float = 3.0 +) -> AsyncIterator[Process]: + try: + yield proc + finally: + if proc.returncode is not None: + return + proc.terminate() + try: + await asyncio.wait_for(proc.wait(), timeout=timeout) + except asyncio.TimeoutError: + print(f"Failed to stop process {shlex.join(cmd)}. Killing it.") + proc.kill() + await proc.wait() + + +@asynccontextmanager +async def nix_eval_jobs(opts: Options) -> AsyncIterator[Process]: with TemporaryDirectory() as d: + temp = d + + if opts.remote: + # TODO: This is bad + temp = "/tmp/gc-roots" + args = [ "nix-eval-jobs", "--gc-roots-dir", - d, + temp, "--force-recurse", "--max-memory-size", str(opts.eval_max_memory_size), "--workers", str(opts.eval_workers), "--flake", - opts.flake, + f"{opts.flake_url}#{opts.flake_fragment}", ] + opts.options if opts.skip_cached: args.append("--check-cache-status") - print("$ " + " ".join(args)) - with subprocess.Popen(args, text=True, stdout=subprocess.PIPE) as proc: - try: - yield proc - finally: - proc.kill() - - -@contextmanager -def nix_build( - installable: str, stdout: IO[Any] | None, opts: Options -) -> Iterator[subprocess.Popen]: - log_format = "raw" - args = [ - "nix", - "build", - installable, - "--log-format", - log_format, - "--keep-going", - ] + opts.options - if opts.verbose: - print("$ " + " ".join(args)) - with subprocess.Popen(args, text=True, stderr=stdout) as proc: - try: + if opts.remote: + args = nix_shell(["nixpkgs#nix-eval-jobs"]) + args + args = maybe_remote(args, opts) + print("$ " + shlex.join(args)) + proc = await asyncio.create_subprocess_exec(*args, stdout=subprocess.PIPE) + async with ensure_stop(proc, args) as proc: yield proc - finally: - proc.kill() + + +@asynccontextmanager +async def nix_output_monitor(fd: int, opts: Options) -> AsyncIterator[Process]: + cmd = maybe_remote(nix_shell(["nixpkgs#nix-output-monitor"]) + ["nom"], opts) + proc = await asyncio.create_subprocess_exec(*cmd, stdin=fd) + async with ensure_stop(proc, cmd) as proc: + yield proc @dataclass @@ -185,133 +302,127 @@ class Build: attr: str drv_path: str outputs: dict[str, str] - proc: subprocess.Popen[str] - retries: int - rc: int | None = None + + async def build( + self, stack: AsyncExitStack, build_output: IO[str], opts: Options + ) -> int: + proc = await stack.enter_async_context( + nix_build(self.drv_path + "^*", build_output, opts) + ) + rc = 0 + for _ in range(opts.retries + 1): + rc = await proc.wait() + if rc == 0: + if opts.verbose: + print(f"build {self.attr} succeeded") + return rc + print(f"build {self.attr} exited with {rc}", file=sys.stderr) + return rc + + async def nix_copy( + self, args: list[str], exit_stack: AsyncExitStack, opts: Options + ) -> int: + cmd = maybe_remote(["nix", "copy", "--log-format", "raw"] + args, opts) + if opts.verbose: + print("$ " + shlex.join(cmd)) + proc = await asyncio.create_subprocess_exec(*cmd) + await exit_stack.enter_async_context(ensure_stop(proc, cmd)) + return await proc.wait() + + async def upload(self, exit_stack: AsyncExitStack, opts: Options) -> int: + if not opts.copy_to: + return 0 + cmd = ["nix", "copy", "--log-format", "raw", "--to", opts.copy_to] + list( + self.outputs.values() + ) + cmd = maybe_remote(cmd, opts) + if opts.verbose: + print("$ " + shlex.join(cmd)) + proc = await asyncio.create_subprocess_exec(*cmd) + await exit_stack.enter_async_context(ensure_stop(proc, cmd)) + return await proc.wait() + + async def download(self, exit_stack: AsyncExitStack, opts: Options) -> int: + if not opts.remote_url: + return 0 + cmd = [ + "nix", + "copy", + "--log-format", + "raw", + "--no-check-sigs", + "--from", + opts.remote_url, + ] + list(self.outputs.values()) + if opts.verbose: + print("$ " + shlex.join(cmd)) + proc = await asyncio.create_subprocess_exec(*cmd) + await exit_stack.enter_async_context(ensure_stop(proc, cmd)) + return await proc.wait() @dataclass -class EvalError: +class Failure(ABC): attr: str - error: str + error_message: str -def wait_for_any_build(builds: list[Build]) -> Build: - while True: - for i, build in enumerate(builds): - rc = build.proc.poll() - if rc is not None: - del builds[i] - build.rc = rc - return build - time.sleep(0.05) +class EvalFailure(Failure): + pass -def drain_builds( - builds: list[Build], stdout: IO[Any] | None, stack: ExitStack, opts: Options -) -> list[Build]: - build = wait_for_any_build(builds) - if build.rc != 0: - print(f"build {build.attr} exited with {build.rc}", file=sys.stderr) - if build.retries < opts.retries: - print(f"retrying build {build.attr} [{build.retries + 1}/{opts.retries}]") - builds.append( - create_build( - build.attr, - build.drv_path, - build.outputs, - stdout, - stack, - opts, - build.retries + 1, - ) - ) - else: - return [build] - return [] +class BuildFailure(Failure): + pass -def create_build( - attr: str, - drv_path: str, - outputs: dict[str, str], - stdout: IO[Any] | None, - exit_stack: ExitStack, - opts: Options, - retries: int = 0, -) -> Build: - nix_build_proc = exit_stack.enter_context(nix_build(drv_path + "^*", stdout, opts)) - if opts.copy_to: - if opts.verbose: - print(f"copying {attr} to {opts.copy_to}") - exit_stack.enter_context( - subprocess.Popen( - [ - "nix", - "copy", - "--to", - opts.copy_to, - ] - + list(outputs.values()), - ) - ) - return Build(attr, drv_path, outputs, nix_build_proc, retries=retries) +class UploadFailure(Failure): + pass -class Pipe: - def __init__(self) -> None: - fds = os.pipe() - self.read_file = os.fdopen(fds[0], "rb") - self.write_file = os.fdopen(fds[1], "wb") - - def __enter__(self) -> "Pipe": - return self - - def __exit__(self, _exc_type: Any, _exc_value: Any, _traceback: Any) -> None: - self.read_file.close() - self.write_file.close() +class DownloadFailure(Failure): + pass -def stop_gracefully(proc: subprocess.Popen, timeout: int = 1) -> None: - proc.terminate() - try: - proc.wait(timeout=timeout) - except subprocess.TimeoutExpired: - proc.kill() - - -@contextmanager -def nix_output_monitor(fd: int) -> Iterator[subprocess.Popen]: - proc = subprocess.Popen(["nom"], stdin=fd) +@asynccontextmanager +async def nix_build( + installable: str, stderr: IO[Any] | None, opts: Options +) -> AsyncIterator[Process]: + args = [ + "nix", + "build", + installable, + "--log-format", + "raw", + "--keep-going", + ] + opts.options + args = maybe_remote(args, opts) + if opts.verbose: + print("$ " + shlex.join(args)) + proc = await asyncio.create_subprocess_exec(*args, stderr=stderr) try: yield proc finally: - stop_gracefully(proc) + proc.kill() -def run_builds(stack: ExitStack, opts: Options) -> int: - eval_error = [] - build_failures = [] - drv_paths = set() - proc = stack.enter_context(nix_eval_jobs(opts)) - assert proc.stdout - pipe = stack.enter_context(Pipe()) - nom_proc: subprocess.Popen | None = None - stdout = pipe.write_file - builds: list[Build] = [] - for line in proc.stdout: +async def run_evaluation( + eval_proc: Process, + build_queue: Queue[tuple[str, str, str]], + failures: list[Failure], + opts: Options, +) -> None: + assert eval_proc.stdout + async for line in eval_proc.stdout: if opts.verbose: print(line, end="") - if nom_proc is None: - nom_proc = stack.enter_context(nix_output_monitor(pipe.read_file.fileno())) try: job = json.loads(line) except json.JSONDecodeError: - die(f"Failed to parse line of nix-eval-jobs output: {line}") + die(f"Failed to parse line of nix-eval-jobs output: {line.decode()}") error = job.get("error") attr = job.get("attr", "unknown-flake-attribute") if error: - eval_error.append(EvalError(attr, error)) + failures.append(EvalFailure(attr, error)) continue is_cached = job.get("isCached", False) if is_cached: @@ -321,48 +432,164 @@ def run_builds(stack: ExitStack, opts: Options) -> int: continue drv_path = job.get("drvPath") if not drv_path: - die(f"nix-eval-jobs did not return a drvPath: {line}") - while len(builds) >= opts.max_jobs and opts.max_jobs != 0: - build_failures += drain_builds(builds, stdout, stack, opts) + die(f"nix-eval-jobs did not return a drvPath: {line.decode()}") + outputs = job.get("outputs", {}) + build_queue.put_nowait((attr, drv_path, outputs)) + + +async def run_builds( + stack: AsyncExitStack, + build_output: IO, + build_queue: Queue, + upload_queue: Queue, + download_queue: Queue, + failures: list[Failure], + opts: Options, +) -> NoReturn: + drv_paths: set[Any] = set() + + while True: + attr, drv_path, outputs = await build_queue.get() print(f" building {attr}") if drv_path in drv_paths: continue drv_paths.add(drv_path) - outputs = job.get("outputs", {}) - builds.append(create_build(attr, drv_path, outputs, stdout, stack, opts)) + build = Build(attr, drv_path, outputs) + rc = await build.build(stack, build_output, opts) + build_queue.task_done() + if rc == 0: + upload_queue.put_nowait(build) + download_queue.put_nowait(build) + else: + failures.append(BuildFailure(build.attr, f"build exited with {rc}")) - while builds: - build_failures += drain_builds(builds, stdout, stack, opts) - if nom_proc is not None: - stop_gracefully(nom_proc) +async def run_uploads( + stack: AsyncExitStack, + upload_queue: Queue[Build], + failures: list[Failure], + opts: Options, +) -> NoReturn: + while True: + build = await upload_queue.get() + rc = await build.upload(stack, opts) + if rc != 0: + failures.append(UploadFailure(build.attr, f"upload exited with {rc}")) + upload_queue.task_done() - eval_rc = proc.wait() - if eval_rc != 0: - print( - f"nix-eval-jobs exited with {eval_rc}, check logs for details", - file=sys.stderr, + +async def run_downloads( + stack: AsyncExitStack, + download_queue: Queue[Build], + failures: list[Failure], + opts: Options, +) -> NoReturn: + while True: + build = await download_queue.get() + rc = await build.download(stack, opts) + if rc != 0: + failures.append(DownloadFailure(build.attr, f"download exited with {rc}")) + download_queue.task_done() + + +async def report_progress( + build_queue: Queue, + upload_queue: Queue, + download_queue: Queue, +) -> NoReturn: + old_status = "" + while True: + new_status = f"builds: {build_queue.qsize()}, uploads: {upload_queue.qsize()}, downloads: {download_queue.qsize()}" + if new_status != old_status: + print(new_status) + old_status = new_status + await asyncio.sleep(0.5) + + +async def run(stack: AsyncExitStack, opts: Options) -> int: + eval_proc_future = stack.enter_async_context(nix_eval_jobs(opts)) + pipe: Pipe | None = None + output_monitor_future: Coroutine[None, None, Process] | None = None + if opts.nom: + pipe = stack.enter_context(Pipe()) + output_monitor_future = stack.enter_async_context( + nix_output_monitor(pipe.read_file.fileno(), opts) ) + eval_proc = await eval_proc_future + output_monitor: Process | None = None + if output_monitor_future: + output_monitor = await output_monitor_future + failures: DefaultDict[type, list[Failure]] = defaultdict(list) + build_queue: Queue[tuple[str, str, str]] = Queue() + upload_queue: Queue[Build] = Queue() + download_queue: Queue[Build] = Queue() - for error in eval_error: - print(f"{error.attr}: {error.error}", file=sys.stderr) + evaluation = run_evaluation(eval_proc, build_queue, failures[EvalFailure], opts) + async with TaskGroup() as tg: + build_output = sys.stdout.buffer + if pipe: + build_output = pipe.write_file + tasks = [] + for _ in range(opts.max_jobs): + tasks.append( + tg.create_task( + run_builds( + stack, + build_output, + build_queue, + upload_queue, + download_queue, + failures[BuildFailure], + opts, + ) + ) + ) + tasks.append( + tg.create_task( + run_uploads(stack, upload_queue, failures[UploadFailure], opts) + ) + ) + tasks.append( + tg.create_task( + run_downloads(stack, download_queue, failures[DownloadFailure], opts) + ) + ) + if not opts.nom: + tasks.append( + tg.create_task( + report_progress(build_queue, upload_queue, download_queue) + ) + ) + await evaluation + await build_queue.join() + await upload_queue.join() + await download_queue.join() + for task in tasks: + task.cancel() - for build in build_failures: - print(f"{build.attr}: build failed with {build.rc}", file=sys.stderr) + for failure_type in [EvalFailure, BuildFailure, UploadFailure, DownloadFailure]: + for failure in failures[failure_type]: + print( + f"{failure_type.__name__} for {failure.attr}: {failure.error_message}" + ) + if eval_proc.returncode != 0 and eval_proc.returncode is not None: + print(f"nix-eval-jobs exited with {eval_proc.returncode}") + if ( + output_monitor + and output_monitor.returncode != 0 + and output_monitor.returncode is not None + ): + print(f"nix-output-monitor exited with {output_monitor.returncode}") - if len(build_failures) > 0 or len(eval_error) > 0 or eval_rc != 0: - return 1 - else: - return 0 + return 0 -def main() -> None: - opts = parse_args(sys.argv[1:]) +async def main() -> None: + nix_config = await get_nix_config() + opts = parse_args(sys.argv[1:], nix_config) rc = 0 - with ExitStack() as stack: - rc = run_builds(stack, opts) + async with AsyncExitStack() as stack: + if opts.remote_url: + opts.flake_url = upload_sources(opts.remote_url, opts.flake_url) + rc = await run(stack, opts) sys.exit(rc) - - -if __name__ == "__main__": - main() diff --git a/pyproject.toml b/pyproject.toml index c442703..4172a12 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,7 +13,6 @@ classifiers = [ "Environment :: Console", "Topic :: Utilities", "Intended Audience :: Developers", - "Programming Language :: Python :: 3.6", ] [project.urls] @@ -58,7 +57,7 @@ exclude = ''' ''' [tool.mypy] -python_version = "3.10" +python_version = "3.11" warn_redundant_casts = true disallow_untyped_calls = true disallow_untyped_defs = true