maintainers/scripts/update.nix: switch to asyncio

This will make it cleaner and also better respect SIGTERM.
This commit is contained in:
Jan Tojnar 2020-09-18 22:22:42 +02:00
parent 17f89667b3
commit 01b9d5371c
No known key found for this signature in database
GPG Key ID: 7FAB2A15F7A607A4

View File

@ -1,35 +1,65 @@
from typing import Dict, Generator, Tuple, Union from __future__ import annotations
from typing import Dict, Generator, List, Optional, Tuple
import argparse import argparse
import asyncio
import contextlib import contextlib
import concurrent.futures
import json import json
import os import os
import subprocess import subprocess
import sys import sys
import tempfile import tempfile
import threading
updates: Dict[concurrent.futures.Future, Dict] = {} class CalledProcessError(Exception):
process: asyncio.subprocess.Process
TempDirs = Dict[str, Tuple[str, str, threading.Lock]]
thread_name_prefix = 'UpdateScriptThread'
def eprint(*args, **kwargs): def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs) print(*args, file=sys.stderr, **kwargs)
def run_update_script(package: Dict, commit: bool, temp_dirs: TempDirs) -> subprocess.CompletedProcess: async def check_subprocess(*args, **kwargs):
worktree: Union[None, str] = None """
Emulate check argument of subprocess.run function.
"""
process = await asyncio.create_subprocess_exec(*args, **kwargs)
returncode = await process.wait()
if commit and 'commit' in package['supportedFeatures']: if returncode != 0:
thread_name = threading.current_thread().name error = CalledProcessError()
worktree, _branch, lock = temp_dirs[thread_name] error.process = process
lock.acquire()
package['thread'] = thread_name raise error
return process
async def run_update_script(merge_lock: asyncio.Lock, temp_dir: Optional[Tuple[str, str]], package: Dict, keep_going: bool):
worktree: Optional[str] = None
if temp_dir is not None:
worktree, _branch = temp_dir
eprint(f" - {package['name']}: UPDATING ...") eprint(f" - {package['name']}: UPDATING ...")
return subprocess.run(package['updateScript'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, cwd=worktree) try:
update_process = await check_subprocess(*package['updateScript'], stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=worktree)
update_info = await update_process.stdout.read()
await merge_changes(merge_lock, package, update_info, temp_dir)
except KeyboardInterrupt as e:
eprint('Cancelling…')
raise asyncio.exceptions.CancelledError()
except CalledProcessError as e:
eprint(f" - {package['name']}: ERROR")
eprint()
eprint(f"--- SHOWING ERROR LOG FOR {package['name']} ----------------------")
eprint()
stderr = await e.process.stderr.read()
eprint(stderr.decode('utf-8'))
with open(f"{package['pname']}.log", 'wb') as logfile:
logfile.write(stderr)
eprint()
eprint(f"--- SHOWING ERROR LOG FOR {package['name']} ----------------------")
if not keep_going:
raise asyncio.exceptions.CancelledError()
@contextlib.contextmanager @contextlib.contextmanager
def make_worktree() -> Generator[Tuple[str, str], None, None]: def make_worktree() -> Generator[Tuple[str, str], None, None]:
@ -37,46 +67,76 @@ def make_worktree() -> Generator[Tuple[str, str], None, None]:
branch_name = f'update-{os.path.basename(wt)}' branch_name = f'update-{os.path.basename(wt)}'
target_directory = f'{wt}/nixpkgs' target_directory = f'{wt}/nixpkgs'
subprocess.run(['git', 'worktree', 'add', '-b', branch_name, target_directory], check=True) subprocess.run(['git', 'worktree', 'add', '-b', branch_name, target_directory])
yield (target_directory, branch_name) yield (target_directory, branch_name)
subprocess.run(['git', 'worktree', 'remove', target_directory], check=True) subprocess.run(['git', 'worktree', 'remove', '--force', target_directory])
subprocess.run(['git', 'branch', '-D', branch_name], check=True) subprocess.run(['git', 'branch', '-D', branch_name])
def commit_changes(worktree: str, branch: str, execution: subprocess.CompletedProcess) -> None: async def commit_changes(merge_lock: asyncio.Lock, worktree: str, branch: str, update_info: str) -> None:
changes = json.loads(execution.stdout) changes = json.loads(update_info)
for change in changes: for change in changes:
subprocess.run(['git', 'add'] + change['files'], check=True, cwd=worktree) # Git can only handle a single index operation at a time
commit_message = '{attrPath}: {oldVersion}{newVersion}'.format(**change) async with merge_lock:
subprocess.run(['git', 'commit', '-m', commit_message], check=True, cwd=worktree) await check_subprocess('git', 'add', *change['files'], cwd=worktree)
subprocess.run(['git', 'cherry-pick', branch], check=True) commit_message = '{attrPath}: {oldVersion}{newVersion}'.format(**change)
await check_subprocess('git', 'commit', '--quiet', '-m', commit_message, cwd=worktree)
await check_subprocess('git', 'cherry-pick', branch)
def merge_changes(package: Dict, future: concurrent.futures.Future, commit: bool, keep_going: bool, temp_dirs: TempDirs) -> None: async def merge_changes(merge_lock: asyncio.Lock, package: Dict, update_info: str, temp_dir: Optional[Tuple[str, str]]) -> None:
try: if temp_dir is not None:
execution = future.result() worktree, branch = temp_dir
if commit and 'commit' in package['supportedFeatures']: await commit_changes(merge_lock, worktree, branch, update_info)
thread_name = package['thread'] eprint(f" - {package['name']}: DONE.")
worktree, branch, lock = temp_dirs[thread_name]
commit_changes(worktree, branch, execution)
eprint(f" - {package['name']}: DONE.")
except subprocess.CalledProcessError as e:
eprint(f" - {package['name']}: ERROR")
eprint()
eprint(f"--- SHOWING ERROR LOG FOR {package['name']} ----------------------")
eprint()
eprint(e.stdout.decode('utf-8'))
with open(f"{package['pname']}.log", 'wb') as logfile:
logfile.write(e.stdout)
eprint()
eprint(f"--- SHOWING ERROR LOG FOR {package['name']} ----------------------")
if not keep_going: async def updater(temp_dir: Optional[Tuple[str, str]], merge_lock: asyncio.Lock, packages_to_update: asyncio.Queue[Optional[Dict]], keep_going: bool, commit: bool):
sys.exit(1) while True:
finally: package = await packages_to_update.get()
if commit and 'commit' in package['supportedFeatures']: if package is None:
lock.release() # A sentinel received, we are done.
return
def main(max_workers: int, keep_going: bool, commit: bool, packages: Dict) -> None: if not ('commit' in package['supportedFeatures']):
with open(sys.argv[1]) as f: temp_dir = None
await run_update_script(merge_lock, temp_dir, package, keep_going)
async def start_updates(max_workers: int, keep_going: bool, commit: bool, packages: List[Dict]):
merge_lock = asyncio.Lock()
packages_to_update: asyncio.Queue[Optional[Dict]] = asyncio.Queue()
with contextlib.ExitStack() as stack:
temp_dirs: List[Optional[Tuple[str, str]]] = []
# Do not create more workers than there are packages.
num_workers = min(max_workers, len(packages))
# Set up temporary directories when using auto-commit.
for i in range(num_workers):
temp_dir = stack.enter_context(make_worktree()) if commit else None
temp_dirs.append(temp_dir)
# Fill up an update queue,
for package in packages:
await packages_to_update.put(package)
# Add sentinels, one for each worker.
# A workers will terminate when it gets sentinel from the queue.
for i in range(num_workers):
await packages_to_update.put(None)
# Prepare updater workers for each temp_dir directory.
# At most `num_workers` instances of `run_update_script` will be running at one time.
updaters = asyncio.gather(*[updater(temp_dir, merge_lock, packages_to_update, keep_going, commit) for temp_dir in temp_dirs])
try:
# Start updater workers.
await updaters
except asyncio.exceptions.CancelledError as e:
# When one worker is cancelled, cancel the others too.
updaters.cancel()
def main(max_workers: int, keep_going: bool, commit: bool, packages_path: str) -> None:
with open(packages_path) as f:
packages = json.load(f) packages = json.load(f)
eprint() eprint()
@ -90,19 +150,7 @@ def main(max_workers: int, keep_going: bool, commit: bool, packages: Dict) -> No
eprint() eprint()
eprint('Running update for:') eprint('Running update for:')
with contextlib.ExitStack() as stack, concurrent.futures.ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix=thread_name_prefix) as executor: asyncio.run(start_updates(max_workers, keep_going, commit, packages))
if commit:
temp_dirs = {f'{thread_name_prefix}_{str(i)}': (*stack.enter_context(make_worktree()), threading.Lock()) for i in range(max_workers)}
else:
temp_dirs = {}
for package in packages:
updates[executor.submit(run_update_script, package, commit, temp_dirs)] = package
for future in concurrent.futures.as_completed(updates):
package = updates[future]
merge_changes(package, future, commit, keep_going, temp_dirs)
eprint() eprint()
eprint('Packages updated!') eprint('Packages updated!')
@ -122,8 +170,6 @@ if __name__ == '__main__':
try: try:
main(args.max_workers, args.keep_going, args.commit, args.packages) main(args.max_workers, args.keep_going, args.commit, args.packages)
except (KeyboardInterrupt, SystemExit) as e: except KeyboardInterrupt as e:
for update in updates: # Lets cancel outside of the main loop too.
update.cancel() sys.exit(130)
sys.exit(e.code if isinstance(e, SystemExit) else 130)