Merge pull request #438 from hsjobeki/fix/format

chore: add formatting of python
This commit is contained in:
DavHau 2022-12-30 10:25:09 +08:00 committed by GitHub
commit 4be2700076
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 899 additions and 798 deletions

View File

@ -177,10 +177,15 @@
// {
# passes through extra flags to treefmt
format.type = "app";
format.program =
format.program = let
path = lib.makeBinPath [
alejandra.defaultPackage.${system}
pkgs.python3.pkgs.black
];
in
l.toString
(pkgs.writeScript "format" ''
export PATH="${alejandra.defaultPackage.${system}}/bin"
export PATH="${path}"
${pkgs.treefmt}/bin/treefmt --clear-cache "$@"
'');
@ -205,6 +210,11 @@
dream2nix-shell = mkShell {
devshell.name = "dream2nix-devshell";
packages = [
alejandra.defaultPackage.${system}
pkgs.python3.pkgs.black
];
commands =
[
{package = pkgs.nix;}
@ -221,10 +231,6 @@
package = pkgs.treefmt;
category = "formatting";
}
{
package = alejandra.defaultPackage.${system};
category = "formatting";
}
]
# using linux is highly recommended as cntr is amazing for debugging builds
++ lib.optional pkgs.stdenv.isLinux {
@ -273,8 +279,8 @@
};
cleanup = {
enable = true;
name = "cleaned";
entry = l.toString (pkgs.writeScript "cleaned" ''
name = "cleanup";
entry = l.toString (pkgs.writeScript "cleanup" ''
#!${pkgs.bash}/bin/bash
for badFile in $(find ./examples | grep 'flake.lock\|dream2nix-packages'); do
rm -rf $badFile

View File

@ -1,10 +1,11 @@
import sys
import json
def replace_root_sources(lock, newSource):
packages = lock['_generic']['packages']
sources = lock['sources']
packages = lock["_generic"]["packages"]
sources = lock["sources"]
for name, version in packages.items():
@ -12,17 +13,17 @@ def replace_root_sources(lock, newSource):
fixed = newSource
if 'dir' in original:
fixed['dir'] = original['dir']
if "dir" in original:
fixed["dir"] = original["dir"]
sources[name][version] = fixed
lock['sources'] = sources
lock["sources"] = sources
return lock
if __name__ == '__main__':
if __name__ == "__main__":
lockFile = sys.argv[1]
newSourceFile = sys.argv[2]

View File

@ -3,8 +3,8 @@ import os
import sys
failed_proj_ids = list(os.listdir('translation-errors'))
failed_proj_ids = list(os.listdir("translation-errors"))
print("saving list of failed projects in ./translation-errors.json")
print("failure logs can be found in ./translation-errors/")
with open(sys.argv[1], 'w') as f:
json.dump(failed_proj_ids, f, indent=2)
with open(sys.argv[1], "w") as f:
json.dump(failed_proj_ids, f, indent=2)

View File

@ -7,17 +7,19 @@ number = int(os.environ.get("number"))
input = json.load(sys.stdin)
projects = []
for package in input['crates']:
projects.append(dict(
id=f"{package['name']}-{package['max_stable_version']}",
name=package['name'],
version=package['max_stable_version'],
translator='crates-io',
))
for package in input["crates"]:
projects.append(
dict(
id=f"{package['name']}-{package['max_stable_version']}",
name=package["name"],
version=package["max_stable_version"],
translator="crates-io",
)
)
with open(out_file) as f:
existing_projects = json.load(f)
existing_projects = json.load(f)
all_projects = (existing_projects + projects)[:number]
with open(out_file, 'w') as f:
json.dump(all_projects, f, indent=2)
with open(out_file, "w") as f:
json.dump(all_projects, f, indent=2)

View File

@ -7,33 +7,35 @@ platform = os.environ.get("platform")
number = int(os.environ.get("number"))
if platform == "hackage":
sort_key = lambda v: [int(n) for n in v['number'].split('.')]
sort_key = lambda v: [int(n) for n in v["number"].split(".")]
else:
sort_key = key=lambda v: v['published_at']
sort_key = key = lambda v: v["published_at"]
input = json.load(sys.stdin)
projects = []
for package in input:
versions = package['versions']
versions = sorted(versions, key=sort_key, reverse=True)
if versions:
# latest_stable_release_number is often wrong for hackage
if platform == "hackage":
latest_version = versions[0]['number']
else:
latest_version = package["latest_stable_release_number"]
if latest_version == None:
latest_version = versions[0]['number']
projects.append(dict(
id=f"{package['name']}-{latest_version}",
name=package['name'],
version=latest_version,
translator=platform,
))
versions = package["versions"]
versions = sorted(versions, key=sort_key, reverse=True)
if versions:
# latest_stable_release_number is often wrong for hackage
if platform == "hackage":
latest_version = versions[0]["number"]
else:
latest_version = package["latest_stable_release_number"]
if latest_version == None:
latest_version = versions[0]["number"]
projects.append(
dict(
id=f"{package['name']}-{latest_version}",
name=package["name"],
version=latest_version,
translator=platform,
)
)
with open(out_file) as f:
existing_projects = json.load(f)
existing_projects = json.load(f)
all_projects = (existing_projects + projects)[:number]
with open(out_file, 'w') as f:
json.dump(all_projects, f, indent=2)
with open(out_file, "w") as f:
json.dump(all_projects, f, indent=2)

View File

@ -3,12 +3,14 @@ import sys
input = json.load(sys.stdin)
projects = []
for object in input['objects']:
package = object['package']
projects.append(dict(
id=f"{package['name']}-{package['version']}".replace('/', '_'),
name=package['name'],
version=package['version'],
translator='npm',
))
for object in input["objects"]:
package = object["package"]
projects.append(
dict(
id=f"{package['name']}-{package['version']}".replace("/", "_"),
name=package["name"],
version=package["version"],
translator="npm",
)
)
print(json.dumps(projects, indent=2))

View File

@ -5,7 +5,7 @@ from pathlib import Path
def store_error(attrPath, category, text, name=None):
with open(f"errors/{attrPath.replace('/', '--')}", 'w') as f:
with open(f"errors/{attrPath.replace('/', '--')}", "w") as f:
json.dump(
dict(
attrPath=attrPath,
@ -18,42 +18,37 @@ def store_error(attrPath, category, text, name=None):
input = json.loads(sys.argv[1])
attr = input['attr']
attrPath = '.'.join(input['attrPath'])
attr = input["attr"]
attrPath = ".".join(input["attrPath"])
# handle eval error
if "error" in input:
error = input['error']
error = input["error"]
print(
f"Evaluation failed. attr: {attr} attrPath: {attrPath}\n"
"Error:\n{error}",
file=sys.stderr
f"Evaluation failed. attr: {attr} attrPath: {attrPath}\n" "Error:\n{error}",
file=sys.stderr,
)
store_error(attrPath, 'eval', error)
store_error(attrPath, "eval", error)
# try to build package
else:
name = input['name']
drvPath = input['drvPath']
name = input["name"]
drvPath = input["drvPath"]
print(
f"Building {name} attr: {attr} attrPath: {attrPath} "
f"drvPath: ({drvPath})",
file=sys.stderr
f"Building {name} attr: {attr} attrPath: {attrPath} " f"drvPath: ({drvPath})",
file=sys.stderr,
)
try:
proc = sp.run(
['nix', 'build', '-L', drvPath],
["nix", "build", "-L", drvPath],
capture_output=True,
check=True,
)
print(
f"Finished {name}. attr: {attr} attrPath: {attrPath}",
file=sys.stderr
)
print(f"Finished {name}. attr: {attr} attrPath: {attrPath}", file=sys.stderr)
# handle build error
except sp.CalledProcessError as error:
Path('errors').mkdir(exist_ok=True)
Path("errors").mkdir(exist_ok=True)
print(
f"Error while building {name}. attr: {attr} attrPath: {attrPath}",
file=sys.stderr
file=sys.stderr,
)
store_error(attrPath, 'build', error.stderr.decode(), name)
store_error(attrPath, "build", error.stderr.decode(), name)

View File

@ -1,7 +1,7 @@
import json
import os
error_files = os.listdir('errors')
error_files = os.listdir("errors")
eval_errors = 0
build_errors = 0
all_errors = {}
@ -10,9 +10,9 @@ for file in error_files:
with open(f"errors/{file}") as f:
error = json.load(f)
# add error to all_errors
all_errors[error['attrPath']] = error
all_errors[error["attrPath"]] = error
# count error types
if error['category'] == 'eval':
if error["category"] == "eval":
eval_errors += 1
else:
build_errors += 1
@ -25,8 +25,8 @@ stats = dict(
errors_build=build_errors,
)
with open("errors.json", 'w') as f:
with open("errors.json", "w") as f:
json.dump(all_errors, f)
with open('stats.json', 'w') as f:
with open("stats.json", "w") as f:
json.dump(stats, f)

View File

@ -5,46 +5,53 @@ import sys
from nix_ffi import nix
def strip_hashes_from_lock(lock):
for name, versions in lock['sources'].items():
for source in versions.values():
if 'hash' in source:
del source['hash']
for name, versions in lock["sources"].items():
for source in versions.values():
if "hash" in source:
del source["hash"]
def aggregate_hashes(lock, outputDreamLock, dream2nix_src, dream2nix_config):
print("Building FOD of aggregated sources to retrieve output hash")
# remove hashes from lock file and init sourcesAggregatedHash with empty string
strip_hashes_from_lock(lock)
lock['_generic']['sourcesAggregatedHash'] = ""
with open(outputDreamLock, 'w') as f:
json.dump(lock, f, indent=2)
lock["_generic"]["sourcesAggregatedHash"] = ""
with open(outputDreamLock, "w") as f:
json.dump(lock, f, indent=2)
# compute FOD hash of aggregated sources
proc = nix(
"build", "--impure", "-L", "--show-trace", "--expr",
f"(import {dream2nix_src} {{ dream2nixConfig = {dream2nix_config}; }}).dream2nix-interface.fetchSources {{ dreamLock = {outputDreamLock}; }}"
"build",
"--impure",
"-L",
"--show-trace",
"--expr",
f"(import {dream2nix_src} {{ dream2nixConfig = {dream2nix_config}; }}).dream2nix-interface.fetchSources {{ dreamLock = {outputDreamLock}; }}",
)
print(proc.stderr.decode())
# read the output hash from the failed build log
match = re.search(r"FOD_HASH=(.*=)", proc.stderr.decode())
if not match:
print(proc.stderr.decode())
print(proc.stdout.decode())
print(
"Error: Could not find FOD hash in FOD log",
file=sys.stderr,
)
print(proc.stderr.decode())
print(proc.stdout.decode())
print(
"Error: Could not find FOD hash in FOD log",
file=sys.stderr,
)
hash = match.groups()[0]
print(f"Computed FOD hash: {hash}")
# store the hash in the lock
lock['_generic']['sourcesAggregatedHash'] = hash
lock["_generic"]["sourcesAggregatedHash"] = hash
return lock
if __name__ == '__main__':
dreamLockFile = sys.argv[1]
with open(dreamLockFile) as f:
lock = json.load(f)
dream2nix_src = os.environ.get('dream2nixWithExternals')
dream2nix_config = os.environ.get('dream2nixConfig')
new_lock = aggregate_hashes(lock, dreamLockFile, dream2nix_src, dream2nix_config)
with open(dreamLockFile, 'w') as f:
json.dump(new_lock, f, indent=2)
if __name__ == "__main__":
dreamLockFile = sys.argv[1]
with open(dreamLockFile) as f:
lock = json.load(f)
dream2nix_src = os.environ.get("dream2nixWithExternals")
dream2nix_config = os.environ.get("dream2nixConfig")
new_lock = aggregate_hashes(lock, dreamLockFile, dream2nix_src, dream2nix_config)
with open(dreamLockFile, "w") as f:
json.dump(new_lock, f, indent=2)

View File

@ -3,13 +3,15 @@ import sys
def format_lock_str(lock):
lockStr = json.dumps(lock, indent=2, sort_keys=True)
lockStr = lockStr \
.replace("[\n ", "[ ") \
.replace("\"\n ]", "\" ]") \
.replace(",\n ", ", ")
return lockStr
lockStr = json.dumps(lock, indent=2, sort_keys=True)
lockStr = (
lockStr.replace("[\n ", "[ ")
.replace('"\n ]', '" ]')
.replace(",\n ", ", ")
)
return lockStr
if __name__ == '__main__':
lock = json.loads(sys.stdin.read())
print(format_lock_str(lock))
if __name__ == "__main__":
lock = json.loads(sys.stdin.read())
print(format_lock_str(lock))

View File

@ -6,58 +6,69 @@ import tempfile
dream2nix_src = os.environ.get("dream2nixSrc")
def nix(*args, **kwargs):
return sp.run(["nix", "--option", "experimental-features", "nix-command flakes"] + list(args), capture_output=True, **kwargs)
return sp.run(
["nix", "--option", "experimental-features", "nix-command flakes"] + list(args),
capture_output=True,
**kwargs,
)
# TODO: deprecate and replace all usage with `eval()` (see below).
def callNixFunction(function_path, **kwargs):
with tempfile.NamedTemporaryFile("w") as input_json_file:
json.dump(dict(**kwargs), input_json_file, indent=2)
input_json_file.seek(0) # flushes write cache
env = os.environ.copy()
env.update(dict(
FUNC_ARGS=input_json_file.name
))
proc = nix(
"eval", "--show-trace", "--impure", "--raw", "--expr",
f'''
with tempfile.NamedTemporaryFile("w") as input_json_file:
json.dump(dict(**kwargs), input_json_file, indent=2)
input_json_file.seek(0) # flushes write cache
env = os.environ.copy()
env.update(dict(FUNC_ARGS=input_json_file.name))
proc = nix(
"eval",
"--show-trace",
"--impure",
"--raw",
"--expr",
f"""
let
d2n = (import {dream2nix_src} {{}});
in
builtins.toJSON (
(d2n.dlib.callViaEnv d2n.{function_path})
)
''',
env=env
)
if proc.returncode:
print(f"Failed calling nix function '{function_path}'", file=sys.stderr)
print(proc.stderr.decode(), file=sys.stderr)
exit(1)
""",
env=env,
)
if proc.returncode:
print(f"Failed calling nix function '{function_path}'", file=sys.stderr)
print(proc.stderr.decode(), file=sys.stderr)
exit(1)
# parse result data
return json.loads(proc.stdout)
# parse result data
return json.loads(proc.stdout)
def eval(attr_path, wrapper_code=None, **kwargs):
if wrapper_code == None:
# dummy wrapper code
wrapper_code = "{result, ...}: result"
if wrapper_code == None:
# dummy wrapper code
wrapper_code = "{result, ...}: result"
is_function_call = len(kwargs) > 0
is_function_call = len(kwargs) > 0
with tempfile.NamedTemporaryFile("w") as input_json_file:
json.dump(dict(**kwargs), input_json_file, indent=2)
input_json_file.seek(0) # flushes write cache
env = os.environ.copy()
env.update(dict(
FUNC_ARGS=input_json_file.name
))
with tempfile.NamedTemporaryFile("w") as wrapper_code_file:
wrapper_code_file.write(wrapper_code)
wrapper_code_file.seek(0) # flushes write cache
proc = nix(
"eval", "--show-trace", "--impure", "--raw", "--expr",
f'''
with tempfile.NamedTemporaryFile("w") as input_json_file:
json.dump(dict(**kwargs), input_json_file, indent=2)
input_json_file.seek(0) # flushes write cache
env = os.environ.copy()
env.update(dict(FUNC_ARGS=input_json_file.name))
with tempfile.NamedTemporaryFile("w") as wrapper_code_file:
wrapper_code_file.write(wrapper_code)
wrapper_code_file.seek(0) # flushes write cache
proc = nix(
"eval",
"--show-trace",
"--impure",
"--raw",
"--expr",
f"""
let
b = builtins;
d2n = (import {dream2nix_src} {{}});
@ -75,57 +86,65 @@ def eval(attr_path, wrapper_code=None, **kwargs):
then b.removeAttrs result ["override" "overrideDerivation"]
else result
)
''',
env=env
)
if proc.returncode:
print(f"Failed evaluating '{attr_path}'", file=sys.stderr)
print(proc.stderr.decode(), file=sys.stderr)
exit(1)
""",
env=env,
)
if proc.returncode:
print(f"Failed evaluating '{attr_path}'", file=sys.stderr)
print(proc.stderr.decode(), file=sys.stderr)
exit(1)
# parse result data
return json.loads(proc.stdout)
# parse result data
return json.loads(proc.stdout)
def buildNixFunction(function_path, **kwargs):
with tempfile.NamedTemporaryFile("w") as input_json_file:
json.dump(dict(**kwargs), input_json_file, indent=2)
input_json_file.seek(0) # flushes write cache
env = os.environ.copy()
env.update(dict(
FUNC_ARGS=input_json_file.name
))
proc = nix(
"build", "--show-trace", "--impure", "-o", "tmp-result", "--expr",
f'''
with tempfile.NamedTemporaryFile("w") as input_json_file:
json.dump(dict(**kwargs), input_json_file, indent=2)
input_json_file.seek(0) # flushes write cache
env = os.environ.copy()
env.update(dict(FUNC_ARGS=input_json_file.name))
proc = nix(
"build",
"--show-trace",
"--impure",
"-o",
"tmp-result",
"--expr",
f"""
let
d2n = (import {dream2nix_src} {{}});
in
(d2n.dlib.callViaEnv d2n.{function_path})
''',
env=env
)
if proc.returncode:
print(f"Failed calling nix function '{function_path}'", file=sys.stderr)
print(proc.stderr.decode(), file=sys.stderr)
exit(1)
""",
env=env,
)
if proc.returncode:
print(f"Failed calling nix function '{function_path}'", file=sys.stderr)
print(proc.stderr.decode(), file=sys.stderr)
exit(1)
# return store path of result
result = os.path.realpath("tmp-result")
os.remove("tmp-result")
return result
# return store path of result
result = os.path.realpath("tmp-result")
os.remove("tmp-result")
return result
def buildNixAttribute(attribute_path):
proc = nix(
"build", "--show-trace", "--impure", "-o", "tmp-result", "--expr",
f"(import {dream2nix_src} {{}}).{attribute_path}",
)
if proc.returncode:
print(f"Failed to build '{attribute_path}'", file=sys.stderr)
print(proc.stderr.decode(), file=sys.stderr)
exit(1)
proc = nix(
"build",
"--show-trace",
"--impure",
"-o",
"tmp-result",
"--expr",
f"(import {dream2nix_src} {{}}).{attribute_path}",
)
if proc.returncode:
print(f"Failed to build '{attribute_path}'", file=sys.stderr)
print(proc.stderr.decode(), file=sys.stderr)
exit(1)
result = os.path.realpath("tmp-result")
os.remove("tmp-result")
return result
result = os.path.realpath("tmp-result")
os.remove("tmp-result")
return result

View File

@ -8,39 +8,63 @@ import subprocess
# for initialization
def update_apt():
subprocess.run(
["apt",
"-o", "Acquire::AllowInsecureRepositories=1",
"-o", "Dir::State::status=./status",
"-o", "Dir::Etc=./etc/apt",
"-o" ,"Dir::State=./state",
"update"
])
[
"apt",
"-o",
"Acquire::AllowInsecureRepositories=1",
"-o",
"Dir::State::status=./status",
"-o",
"Dir::Etc=./etc/apt",
"-o",
"Dir::State=./state",
"update",
]
)
def get_package_info_apt(name):
result = subprocess.run(
["apt",
"-o Acquire::AllowInsecureRepositories=1",
"-o", "Dir::State::status=./status",
"-o", "Dir::Etc=./etc/apt",
[
"apt",
"-o Acquire::AllowInsecureRepositories=1",
"-o",
"Dir::State::status=./status",
"-o",
"Dir::Etc=./etc/apt",
"-o" "Dir::State=./state",
"install", f"{name}", "--print-uris",
"install",
f"{name}",
"--print-uris",
],
stdout=subprocess.PIPE,
text=True,
)
print(f"result {result.stdout}")
with open('./deb-uris', 'w') as f:
with open("./deb-uris", "w") as f:
f.write(result.stdout)
subprocess.run(
["apt",
"-o", "Acquire::AllowInsecureRepositories=1",
"-o", "Dir::State::status=./status",
"-o", "Dir::Etc=./etc/apt",
"-o", "Dir::Cache=./download",
"-o", "Dir::State=./state",
"install", f"{name}", "--download-only", "-y" ,"--allow-unauthenticated",
])
[
"apt",
"-o",
"Acquire::AllowInsecureRepositories=1",
"-o",
"Dir::State::status=./status",
"-o",
"Dir::Etc=./etc/apt",
"-o",
"Dir::Cache=./download",
"-o",
"Dir::State=./state",
"install",
f"{name}",
"--download-only",
"-y",
"--allow-unauthenticated",
]
)
def main():
update_apt()
@ -96,7 +120,7 @@ def main():
)[0]
# dump dream lock to $outputFile
outputFile = (os.environ.get("outputFile"))
outputFile = os.environ.get("outputFile")
dirPath = pathlib.Path(os.path.dirname(outputFile))
dirPath.mkdir(parents=True, exist_ok=True)
with open(outputFile, "w") as lock:

View File

@ -3,15 +3,15 @@ import os
import sys
lock = json.load(sys.stdin)
version = os.environ.get('version')
hash = os.environ.get('hash')
version = os.environ.get("version")
hash = os.environ.get("hash")
# set default package version correctly
name = lock['_generic']['defaultPackage']
lock['sources'][name][version] = dict(
type="http",
url=f"https://hackage.haskell.org/package/{name}-{version}/{name}-{version}.tar.gz",
hash=f"sha256:{hash}",
name = lock["_generic"]["defaultPackage"]
lock["sources"][name][version] = dict(
type="http",
url=f"https://hackage.haskell.org/package/{name}-{version}/{name}-{version}.tar.gz",
hash=f"sha256:{hash}",
)
print(json.dumps(lock, indent=2))

View File

@ -4,24 +4,23 @@ import pathlib
import sys
with open(os.environ.get('dependenciesJsonPath')) as f:
available_deps = json.load(f)
with open(os.environ.get("dependenciesJsonPath")) as f:
available_deps = json.load(f)
with open('package.json', encoding="utf-8-sig") as f:
package_json = json.load(f)
with open("package.json", encoding="utf-8-sig") as f:
package_json = json.load(f)
changed = False
# fail if platform incompatible
if 'os' in package_json:
platform = sys.platform
if platform not in package_json['os']\
or f"!{platform}" in package_json['os']:
print(
f"Package is not compatible with current platform '{platform}'",
file=sys.stderr
)
exit(3)
if "os" in package_json:
platform = sys.platform
if platform not in package_json["os"] or f"!{platform}" in package_json["os"]:
print(
f"Package is not compatible with current platform '{platform}'",
file=sys.stderr,
)
exit(3)
# replace version
# If it is a github dependency referred by revision,
@ -29,46 +28,47 @@ if 'os' in package_json:
# In case of an 'unknown' version coming from the dream lock,
# do not override the version from package.json
version = os.environ.get("version")
if version not in ["unknown", package_json.get('version')]:
print(
"WARNING: The version of this package defined by its package.json "
"doesn't match the version expected by dream2nix."
"\n -> Replacing version in package.json: "
f"{package_json.get('version')} -> {version}",
file=sys.stderr
)
changed = True
package_json['version'] = version
if version not in ["unknown", package_json.get("version")]:
print(
"WARNING: The version of this package defined by its package.json "
"doesn't match the version expected by dream2nix."
"\n -> Replacing version in package.json: "
f"{package_json.get('version')} -> {version}",
file=sys.stderr,
)
changed = True
package_json["version"] = version
# pinpoint exact versions
# This is mostly needed to replace git references with exact versions,
# as NPM install will otherwise re-fetch these
if 'dependencies' in package_json:
dependencies = package_json['dependencies']
# dependencies can be a list or dict
for pname in dependencies:
if 'bundledDependencies' in package_json\
and pname in package_json['bundledDependencies']:
continue
if pname not in available_deps:
print(
f"WARNING: Dependency {pname} wanted but not available. Ignoring.",
file=sys.stderr
)
continue
version =\
'unknown' if isinstance(dependencies, list) else dependencies[pname]
if available_deps[pname] != version:
version = available_deps[pname]
changed = True
print(
f"package.json: Pinning version '{version}' to '{available_deps[pname]}'"
f" for dependency '{pname}'",
file=sys.stderr
)
if "dependencies" in package_json:
dependencies = package_json["dependencies"]
# dependencies can be a list or dict
for pname in dependencies:
if (
"bundledDependencies" in package_json
and pname in package_json["bundledDependencies"]
):
continue
if pname not in available_deps:
print(
f"WARNING: Dependency {pname} wanted but not available. Ignoring.",
file=sys.stderr,
)
continue
version = "unknown" if isinstance(dependencies, list) else dependencies[pname]
if available_deps[pname] != version:
version = available_deps[pname]
changed = True
print(
f"package.json: Pinning version '{version}' to '{available_deps[pname]}'"
f" for dependency '{pname}'",
file=sys.stderr,
)
# write changes to package.json
if changed:
with open('package.json', 'w') as f:
json.dump(package_json, f, indent=2)
with open("package.json", "w") as f:
json.dump(package_json, f, indent=2)

View File

@ -6,196 +6,204 @@ import subprocess as sp
import sys
pname = os.environ.get('packageName')
version = os.environ.get('version')
pname = os.environ.get("packageName")
version = os.environ.get("version")
bin_dir = f"{os.path.abspath('..')}/.bin"
root = f"{os.path.abspath('.')}/node_modules"
package_json_cache = {}
with open(os.environ.get("nodeDepsPath")) as f:
nodeDeps = f.read().split()
nodeDeps = f.read().split()
def get_package_json(path):
if path not in package_json_cache:
if not os.path.isfile(f"{path}/package.json"):
return None
with open(f"{path}/package.json", encoding="utf-8-sig") as f:
package_json_cache[path] = json.load(f)
return package_json_cache[path]
if path not in package_json_cache:
if not os.path.isfile(f"{path}/package.json"):
return None
with open(f"{path}/package.json", encoding="utf-8-sig") as f:
package_json_cache[path] = json.load(f)
return package_json_cache[path]
def install_direct_dependencies():
if not os.path.isdir(root):
os.mkdir(root)
with open(os.environ.get('nodeDepsPath')) as f:
deps = f.read().split()
for dep in deps:
if os.path.isdir(f"{dep}/lib/node_modules"):
for module in os.listdir(f"{dep}/lib/node_modules"):
# ignore hidden directories
if module[0] == ".":
continue
if module[0] == '@':
for submodule in os.listdir(f"{dep}/lib/node_modules/{module}"):
pathlib.Path(f"{root}/{module}").mkdir(exist_ok=True)
print(f"installing: {module}/{submodule}")
origin =\
os.path.realpath(f"{dep}/lib/node_modules/{module}/{submodule}")
if not os.path.exists(f"{root}/{module}/{submodule}"):
os.symlink(origin, f"{root}/{module}/{submodule}")
else:
print(f"installing: {module}")
origin = os.path.realpath(f"{dep}/lib/node_modules/{module}")
if not os.path.isdir(f"{root}/{module}"):
os.symlink(origin, f"{root}/{module}")
else:
print(f"already exists: {root}/{module}")
if not os.path.isdir(root):
os.mkdir(root)
with open(os.environ.get("nodeDepsPath")) as f:
deps = f.read().split()
for dep in deps:
if os.path.isdir(f"{dep}/lib/node_modules"):
for module in os.listdir(f"{dep}/lib/node_modules"):
# ignore hidden directories
if module[0] == ".":
continue
if module[0] == "@":
for submodule in os.listdir(f"{dep}/lib/node_modules/{module}"):
pathlib.Path(f"{root}/{module}").mkdir(exist_ok=True)
print(f"installing: {module}/{submodule}")
origin = os.path.realpath(
f"{dep}/lib/node_modules/{module}/{submodule}"
)
if not os.path.exists(f"{root}/{module}/{submodule}"):
os.symlink(origin, f"{root}/{module}/{submodule}")
else:
print(f"installing: {module}")
origin = os.path.realpath(f"{dep}/lib/node_modules/{module}")
if not os.path.isdir(f"{root}/{module}"):
os.symlink(origin, f"{root}/{module}")
else:
print(f"already exists: {root}/{module}")
def collect_dependencies(root, depth):
if not os.path.isdir(root):
return []
dirs = os.listdir(root)
if not os.path.isdir(root):
return []
dirs = os.listdir(root)
currentDeps = []
for d in dirs:
if d.rpartition('/')[-1].startswith('@'):
subdirs = os.listdir(f"{root}/{d}")
for sd in subdirs:
cur_dir = f"{root}/{d}/{sd}"
currentDeps.append(f"{cur_dir}")
currentDeps = []
for d in dirs:
if d.rpartition("/")[-1].startswith("@"):
subdirs = os.listdir(f"{root}/{d}")
for sd in subdirs:
cur_dir = f"{root}/{d}/{sd}"
currentDeps.append(f"{cur_dir}")
else:
cur_dir = f"{root}/{d}"
currentDeps.append(cur_dir)
if depth == 0:
return currentDeps
else:
cur_dir = f"{root}/{d}"
currentDeps.append(cur_dir)
if depth == 0:
return currentDeps
else:
depsOfDeps =\
map(lambda dep: collect_dependencies(f"{dep}/node_modules", depth - 1), currentDeps)
result = []
for deps in depsOfDeps:
result += deps
return result
depsOfDeps = map(
lambda dep: collect_dependencies(f"{dep}/node_modules", depth - 1),
currentDeps,
)
result = []
for deps in depsOfDeps:
result += deps
return result
def symlink_sub_dependencies():
for dep in collect_dependencies(root, 1):
# compute module path
d1, d2 = dep.split('/')[-2:]
if d1.startswith('@'):
path = f"{root}/{d1}/{d2}"
else:
path = f"{root}/{d2}"
for dep in collect_dependencies(root, 1):
# compute module path
d1, d2 = dep.split("/")[-2:]
if d1.startswith("@"):
path = f"{root}/{d1}/{d2}"
else:
path = f"{root}/{d2}"
# check for collision
if os.path.isdir(path):
continue
# check for collision
if os.path.isdir(path):
continue
# create parent dir
pathlib.Path(os.path.dirname(path)).mkdir(parents=True, exist_ok=True)
# create parent dir
pathlib.Path(os.path.dirname(path)).mkdir(parents=True, exist_ok=True)
# symlink dependency
os.symlink(os.path.realpath(dep), path)
# symlink dependency
os.symlink(os.path.realpath(dep), path)
# create symlinks for executables (bin entries from package.json)
def symlink_bin(bin_dir, package_location, package_json, force=False):
if package_json and 'bin' in package_json and package_json['bin']:
bin = package_json['bin']
if package_json and "bin" in package_json and package_json["bin"]:
bin = package_json["bin"]
def link(name, relpath):
source = f'{bin_dir}/{name}'
sourceDir = os.path.dirname(source)
# create parent dir
pathlib.Path(sourceDir).mkdir(parents=True, exist_ok=True)
dest = os.path.relpath(f'{package_location}/{relpath}', sourceDir)
print(f"symlinking executable. dest: {dest}; source: {source}")
if force and os.path.lexists(source):
os.remove(source)
if not os.path.lexists(source):
os.symlink(dest, source)
def link(name, relpath):
source = f"{bin_dir}/{name}"
sourceDir = os.path.dirname(source)
# create parent dir
pathlib.Path(sourceDir).mkdir(parents=True, exist_ok=True)
dest = os.path.relpath(f"{package_location}/{relpath}", sourceDir)
print(f"symlinking executable. dest: {dest}; source: {source}")
if force and os.path.lexists(source):
os.remove(source)
if not os.path.lexists(source):
os.symlink(dest, source)
if isinstance(bin, str):
name = package_json['name'].split('/')[-1]
link(name, bin)
if isinstance(bin, str):
name = package_json["name"].split("/")[-1]
link(name, bin)
else:
for name, relpath in bin.items():
link(name, relpath)
else:
for name, relpath in bin.items():
link(name, relpath)
# checks if dependency is already installed in the current or parent dir.
def dependency_satisfied(root, pname, version):
if root == "/":
return False
if root == "/":
return False
parent = os.path.dirname(root)
parent = os.path.dirname(root)
if os.path.isdir(f"{root}/{pname}"):
package_json_file = f"{root}/{pname}/package.json"
if os.path.isfile(package_json_file):
if version == get_package_json(f"{root}/{pname}").get('version'):
return True
if os.path.isdir(f"{root}/{pname}"):
package_json_file = f"{root}/{pname}/package.json"
if os.path.isfile(package_json_file):
if version == get_package_json(f"{root}/{pname}").get("version"):
return True
return dependency_satisfied(parent, pname, version)
return dependency_satisfied(parent, pname, version)
# transforms symlinked dependencies into real copies
def symlinks_to_copies(node_modules):
sp.run(f"chmod +wx {node_modules}".split())
for dep in collect_dependencies(node_modules, 0):
sp.run(f"chmod +wx {node_modules}".split())
for dep in collect_dependencies(node_modules, 0):
# only handle symlinks to directories
if not os.path.islink(dep) or os.path.isfile(dep):
continue
# only handle symlinks to directories
if not os.path.islink(dep) or os.path.isfile(dep):
continue
d1, d2 = dep.split('/')[-2:]
if d1[0] == '@':
pname = f"{d1}/{d2}"
sp.run(f"chmod +wx {node_modules}/{d1}".split())
else:
pname = d2
package_json = get_package_json(dep)
if package_json is not None:
version = package_json['version']
if dependency_satisfied(os.path.dirname(node_modules), pname, version):
os.remove(dep)
continue
print(f"copying {dep}")
os.rename(dep, f"{dep}.bac")
os.mkdir(dep)
contents = os.listdir(f"{dep}.bac")
if contents != []:
for node in contents:
if os.path.isdir(f"{dep}.bac/{node}"):
shutil.copytree(f"{dep}.bac/{node}", f"{dep}/{node}", symlinks=True)
if os.path.isdir(f"{dep}/node_modules"):
symlinks_to_copies(f"{dep}/node_modules")
d1, d2 = dep.split("/")[-2:]
if d1[0] == "@":
pname = f"{d1}/{d2}"
sp.run(f"chmod +wx {node_modules}/{d1}".split())
else:
shutil.copy(f"{dep}.bac/{node}", f"{dep}/{node}")
os.remove(f"{dep}.bac")
symlink_bin(f"{bin_dir}", dep, package_json)
pname = d2
package_json = get_package_json(dep)
if package_json is not None:
version = package_json["version"]
if dependency_satisfied(os.path.dirname(node_modules), pname, version):
os.remove(dep)
continue
print(f"copying {dep}")
os.rename(dep, f"{dep}.bac")
os.mkdir(dep)
contents = os.listdir(f"{dep}.bac")
if contents != []:
for node in contents:
if os.path.isdir(f"{dep}.bac/{node}"):
shutil.copytree(f"{dep}.bac/{node}", f"{dep}/{node}", symlinks=True)
if os.path.isdir(f"{dep}/node_modules"):
symlinks_to_copies(f"{dep}/node_modules")
else:
shutil.copy(f"{dep}.bac/{node}", f"{dep}/{node}")
os.remove(f"{dep}.bac")
symlink_bin(f"{bin_dir}", dep, package_json)
def symlink_direct_bins():
deps = []
package_json_file = get_package_json(f"{os.path.abspath('.')}")
deps = []
package_json_file = get_package_json(f"{os.path.abspath('.')}")
if package_json_file:
if 'devDependencies' in package_json_file and package_json_file['devDependencies']:
for dep,_ in package_json_file['devDependencies'].items():
deps.append(dep)
if 'dependencies' in package_json_file and package_json_file['dependencies']:
for dep,_ in package_json_file['dependencies'].items():
deps.append(dep)
if package_json_file:
if (
"devDependencies" in package_json_file
and package_json_file["devDependencies"]
):
for dep, _ in package_json_file["devDependencies"].items():
deps.append(dep)
if "dependencies" in package_json_file and package_json_file["dependencies"]:
for dep, _ in package_json_file["dependencies"].items():
deps.append(dep)
for name in deps:
package_location = f"{root}/{name}"
package_json = get_package_json(package_location)
symlink_bin(f"{bin_dir}", package_location, package_json, force=True)
for name in deps:
package_location = f"{root}/{name}"
package_json = get_package_json(package_location)
symlink_bin(f"{bin_dir}", package_location, package_json, force=True)
# install direct deps
@ -205,8 +213,8 @@ install_direct_dependencies()
symlink_sub_dependencies()
# symlinks to copies
if os.environ.get('installMethod') == 'copy':
symlinks_to_copies(root)
if os.environ.get("installMethod") == "copy":
symlinks_to_copies(root)
# symlink direct deps bins
symlink_direct_bins()

View File

@ -3,39 +3,40 @@ import os
import pathlib
with open('package.json', encoding="utf-8-sig") as f:
package_json = json.load(f)
with open("package.json", encoding="utf-8-sig") as f:
package_json = json.load(f)
out = os.environ.get('out')
out = os.environ.get("out")
# create symlinks for executables (bin entries from package.json)
def symlink_bin(bin_dir, package_json):
if 'bin' in package_json and package_json['bin']:
bin = package_json['bin']
if "bin" in package_json and package_json["bin"]:
bin = package_json["bin"]
def link(name, relpath):
source = f'{bin_dir}/{name}'
sourceDir = os.path.dirname(source)
# make target executable
os.chmod(relpath, 0o777)
# create parent dir
pathlib.Path(sourceDir).mkdir(parents=True, exist_ok=True)
dest = os.path.relpath(relpath, sourceDir)
print(f"symlinking executable. dest: {dest}; source: {source}")
# if a bin with this name exists, overwrite
if os.path.lexists(source):
os.remove(source)
os.symlink(dest, source)
def link(name, relpath):
source = f"{bin_dir}/{name}"
sourceDir = os.path.dirname(source)
# make target executable
os.chmod(relpath, 0o777)
# create parent dir
pathlib.Path(sourceDir).mkdir(parents=True, exist_ok=True)
dest = os.path.relpath(relpath, sourceDir)
print(f"symlinking executable. dest: {dest}; source: {source}")
# if a bin with this name exists, overwrite
if os.path.lexists(source):
os.remove(source)
os.symlink(dest, source)
if isinstance(bin, str):
name = package_json['name'].split('/')[-1]
link(name, bin)
if isinstance(bin, str):
name = package_json["name"].split("/")[-1]
link(name, bin)
else:
for name, relpath in bin.items():
link(name, relpath)
else:
for name, relpath in bin.items():
link(name, relpath)
# symlink current packages executables to $nodeModules/.bin
symlink_bin(f'{out}/lib/node_modules/.bin/', package_json)
symlink_bin(f"{out}/lib/node_modules/.bin/", package_json)
# symlink current packages executables to $out/bin
symlink_bin(f'{out}/bin/', package_json)
symlink_bin(f"{out}/bin/", package_json)

View File

@ -3,12 +3,10 @@ import os
import sys
lock = json.load(sys.stdin)
version = os.environ.get('version')
version = os.environ.get("version")
# set default package version correctly
defaultPackage = lock['_generic']['defaultPackage']
lock['_generic']['packages'] = {
defaultPackage: version
}
defaultPackage = lock["_generic"]["defaultPackage"]
lock["_generic"]["packages"] = {defaultPackage: version}
print(json.dumps(lock, indent=2))

View File

@ -9,89 +9,89 @@ import urllib.request
def main():
directory = sys.argv[1]
directory = sys.argv[1]
with open(sys.argv[2]) as f:
jsonInput = json.load(f)
with open(sys.argv[2]) as f:
jsonInput = json.load(f)
packages = {}
packages = {}
# loop over the downloaded files and compute:
# - url
# - sha256
# - format (sdist/wheel)
for path in list(glob(directory + '/*')):
_, _, file = path.rpartition('/')
# loop over the downloaded files and compute:
# - url
# - sha256
# - format (sdist/wheel)
for path in list(glob(directory + "/*")):
_, _, file = path.rpartition("/")
print(f"processing file: {file}")
print(f"processing file: {file}")
# example: charset_normalizer-2.0.4-py3-none-any.whl
if file.endswith('.whl'):
format = 'wheel'
pname, version = file.split('-')[:2]
with urllib.request.urlopen(f'https://pypi.org/pypi/{pname}/json') as f:
releasesForVersion = json.load(f)['releases'][version]
release = next(r for r in releasesForVersion if r['filename'] == file)
pyver = release['python_version']
# example: charset_normalizer-2.0.4-py3-none-any.whl
if file.endswith(".whl"):
format = "wheel"
pname, version = file.split("-")[:2]
with urllib.request.urlopen(f"https://pypi.org/pypi/{pname}/json") as f:
releasesForVersion = json.load(f)["releases"][version]
release = next(r for r in releasesForVersion if r["filename"] == file)
pyver = release["python_version"]
# example: requests-2.26.0.tar.gz
else:
format = 'sdist'
pname, version, _ = file.rpartition('-')
pyver = 'source'
# example: requests-2.26.0.tar.gz
else:
format = "sdist"
pname, version, _ = file.rpartition("-")
pyver = "source"
url = f"https://files.pythonhosted.org/packages/{pyver}/{pname[0]}/{pname}/{file}"
url = (
f"https://files.pythonhosted.org/packages/{pyver}/{pname[0]}/{pname}/{file}"
)
with open(path, 'rb') as f:
sha256 = f"sha256-{base64.b64encode(hashlib.sha256(f.read()).digest()).decode()}"
with open(path, "rb") as f:
sha256 = (
f"sha256-{base64.b64encode(hashlib.sha256(f.read()).digest()).decode()}"
)
packages[pname] = dict(
version=version,
url=url,
sha256=sha256,
format=format
packages[pname] = dict(version=version, url=url, sha256=sha256, format=format)
# create dream lock
# This translator is not aware of the exact dependency graph.
# This restricts us to use a single derivation builder later,
# which will install all packages at once
dream_lock = dict(
sources={},
_generic={
"subsystem": "python",
"defaultPackage": os.environ.get("NAME"),
"packages": {
os.environ.get("NAME"): os.environ.get("VERSION"),
},
"sourcesAggregatedHash": None,
"location": "",
},
_subsystem={
"application": jsonInput.get("application", False),
"pythonAttr": f"python{sys.version_info.major}{sys.version_info.minor}",
"sourceFormats": {
pname: data["format"] for pname, data in packages.items()
},
},
)
# create dream lock
# This translator is not aware of the exact dependency graph.
# This restricts us to use a single derivation builder later,
# which will install all packages at once
dream_lock = dict(
sources={},
_generic={
"subsystem": "python",
"defaultPackage": os.environ.get('NAME'),
"packages": {
os.environ.get('NAME'): os.environ.get('VERSION'),
},
"sourcesAggregatedHash": None,
"location": "",
},
_subsystem={
"application": jsonInput.get('application', False),
"pythonAttr": f"python{sys.version_info.major}{sys.version_info.minor}",
"sourceFormats":
{pname: data['format'] for pname, data in packages.items()}
}
)
# populate sources of dream lock
for pname, data in packages.items():
if pname not in dream_lock["sources"]:
dream_lock["sources"][pname] = {}
dream_lock["sources"][pname][data["version"]] = dict(
url=data["url"],
hash=data["sha256"],
type="http",
)
# populate sources of dream lock
for pname, data in packages.items():
if pname not in dream_lock['sources']:
dream_lock['sources'][pname] = {}
dream_lock['sources'][pname][data['version']] = dict(
url=data['url'],
hash=data['sha256'],
type='http',
)
# dump dream lock to $ouputFile
print(jsonInput['outputFile'])
dirPath = pathlib.Path(os.path.dirname(jsonInput['outputFile']))
dirPath.mkdir(parents=True, exist_ok=True)
with open(jsonInput['outputFile'], 'w') as lock:
json.dump(dream_lock, lock, indent=2)
# dump dream lock to $ouputFile
print(jsonInput["outputFile"])
dirPath = pathlib.Path(os.path.dirname(jsonInput["outputFile"]))
dirPath.mkdir(parents=True, exist_ok=True)
with open(jsonInput["outputFile"], "w") as lock:
json.dump(dream_lock, lock, indent=2)
if __name__ == "__main__":
main()
main()

View File

@ -2,56 +2,54 @@ import pytest
import nix_ffi
exampleDreamLock = dict(
_generic = dict(
defaultPackage="example",
packages=dict(
example="1.2.3",
_generic=dict(
defaultPackage="example",
packages=dict(
example="1.2.3",
),
subsystem="nodejs",
_subsystemAttrs={},
),
dependencies={},
cyclicDependencies={},
sources=dict(
example={
"1.2.3": dict(
type="path",
rootName=None,
rootVersion=None,
relPath="a/b/c",
),
},
),
subsystem = "nodejs",
_subsystemAttrs = {},
),
dependencies = {},
cyclicDependencies = {},
sources = dict(
example = {
"1.2.3": dict(
type = "path",
rootName = None,
rootVersion = None,
relPath = "a/b/c",
),
},
),
)
def test_dream_lock_inject():
result = nix_ffi.callNixFunction(
'utils.dream-lock.injectDependencies',
dreamLock=exampleDreamLock,
inject=dict(
example={
"1.2.3": [
[ "injected-package", "1.0.0" ]
]
}
),
)
assert result['dependencies']['example']['1.2.3'] == [dict(
name="injected-package",
version="1.0.0",
)]
result = nix_ffi.callNixFunction(
"utils.dream-lock.injectDependencies",
dreamLock=exampleDreamLock,
inject=dict(example={"1.2.3": [["injected-package", "1.0.0"]]}),
)
assert result["dependencies"]["example"]["1.2.3"] == [
dict(
name="injected-package",
version="1.0.0",
)
]
def test_dream_lock_replace_root_sources():
result = nix_ffi.callNixFunction(
'utils.dream-lock.replaceRootSources',
dreamLock=exampleDreamLock,
newSourceRoot=dict(
type = "http",
url = "something",
),
)
assert result['sources']['example']['1.2.3'] == dict(
type = "http",
url = "something",
dir = "a/b/c",
)
result = nix_ffi.callNixFunction(
"utils.dream-lock.replaceRootSources",
dreamLock=exampleDreamLock,
newSourceRoot=dict(
type="http",
url="something",
),
)
assert result["sources"]["example"]["1.2.3"] == dict(
type="http",
url="something",
dir="a/b/c",
)

View File

@ -4,9 +4,9 @@ import pytest
def get_projects_to_test():
tests = nix_ffi.eval(
'translators',
wrapper_code = '''
tests = nix_ffi.eval(
"translators",
wrapper_code="""
{result, ...}: let
lib = (import <nixpkgs> {}).lib;
l = lib // builtins;
@ -25,123 +25,135 @@ def get_projects_to_test():
)
(l.attrValues result)
)
''',
)
result = []
for test in tests:
if test['type'] == 'all':
continue
result.append(dict(
project = dict(
name="test",
relPath="",
translator=test['translator'],
subsystemInfo={},
),
translator=test['translator'],
source = test['source'],
subsystem = test['subsystem'],
type = test['type'],
))
return result
""",
)
result = []
for test in tests:
if test["type"] == "all":
continue
result.append(
dict(
project=dict(
name="test",
relPath="",
translator=test["translator"],
subsystemInfo={},
),
translator=test["translator"],
source=test["source"],
subsystem=test["subsystem"],
type=test["type"],
)
)
return result
projects = get_projects_to_test()
def check_format_dependencies(dependencies):
assert isinstance(dependencies, list)
for dep in dependencies:
assert set(dep.keys()) == {'name', 'version'}
assert isinstance(dep['name'], str)
assert len(dep['name']) > 0
assert isinstance(dep['version'], str)
assert len(dep['version']) > 0
assert isinstance(dependencies, list)
for dep in dependencies:
assert set(dep.keys()) == {"name", "version"}
assert isinstance(dep["name"], str)
assert len(dep["name"]) > 0
assert isinstance(dep["version"], str)
assert len(dep["version"]) > 0
def check_format_sourceSpec(sourceSpec):
assert isinstance(sourceSpec, dict)
assert 'type' in sourceSpec
assert isinstance(sourceSpec, dict)
assert "type" in sourceSpec
@pytest.mark.parametrize("p", projects)
def test_packageName(p):
defaultPackage = nix_ffi.eval(
f"translatorsBySubsystem.{p['subsystem']}.{p['translator']}.finalTranslate",
params=dict(
project=p['project'],
source=p['source'],
),
wrapper_code = '''
defaultPackage = nix_ffi.eval(
f"translatorsBySubsystem.{p['subsystem']}.{p['translator']}.finalTranslate",
params=dict(
project=p["project"],
source=p["source"],
),
wrapper_code="""
{result, ...}:
result.inputs.defaultPackage
''',
)
assert isinstance(defaultPackage, str)
assert len(defaultPackage) > 0
""",
)
assert isinstance(defaultPackage, str)
assert len(defaultPackage) > 0
@pytest.mark.parametrize("p", projects)
def test_exportedPackages(p):
exportedPackages = nix_ffi.eval(
f"translatorsBySubsystem.{p['subsystem']}.{p['translator']}.finalTranslate",
params=dict(
project=p['project'],
source=p['source'],
),
wrapper_code = '''
exportedPackages = nix_ffi.eval(
f"translatorsBySubsystem.{p['subsystem']}.{p['translator']}.finalTranslate",
params=dict(
project=p["project"],
source=p["source"],
),
wrapper_code="""
{result, ...}:
result.inputs.exportedPackages
''',
)
assert isinstance(exportedPackages, dict)
assert len(exportedPackages) > 0
""",
)
assert isinstance(exportedPackages, dict)
assert len(exportedPackages) > 0
@pytest.mark.parametrize("p", projects)
def test_extraObjects(p):
extraObjects = nix_ffi.eval(
f"translatorsBySubsystem.{p['subsystem']}.{p['translator']}.finalTranslate",
params=dict(
project=p['project'],
source=p['source'],
),
wrapper_code = '''
extraObjects = nix_ffi.eval(
f"translatorsBySubsystem.{p['subsystem']}.{p['translator']}.finalTranslate",
params=dict(
project=p["project"],
source=p["source"],
),
wrapper_code="""
{result, ...}:
result.inputs.extraObjects
''',
)
assert isinstance(extraObjects, list)
for extra_obj in extraObjects:
assert set(extra_obj.keys()) == \
{'name', 'version', 'dependencies', 'sourceSpec'}
assert isinstance(extra_obj['name'], str)
assert len(extra_obj['name']) > 0
assert isinstance(extra_obj['version'], str)
assert len(extra_obj['version']) > 0
check_format_dependencies(extra_obj['dependencies'])
check_format_sourceSpec(extra_obj['sourceSpec'])
""",
)
assert isinstance(extraObjects, list)
for extra_obj in extraObjects:
assert set(extra_obj.keys()) == {
"name",
"version",
"dependencies",
"sourceSpec",
}
assert isinstance(extra_obj["name"], str)
assert len(extra_obj["name"]) > 0
assert isinstance(extra_obj["version"], str)
assert len(extra_obj["version"]) > 0
check_format_dependencies(extra_obj["dependencies"])
check_format_sourceSpec(extra_obj["sourceSpec"])
@pytest.mark.parametrize("p", projects)
def test_location(p):
location = nix_ffi.eval(
f"translatorsBySubsystem.{p['subsystem']}.{p['translator']}.finalTranslate",
params=dict(
project=p['project'],
source=p['source'],
),
wrapper_code = '''
location = nix_ffi.eval(
f"translatorsBySubsystem.{p['subsystem']}.{p['translator']}.finalTranslate",
params=dict(
project=p["project"],
source=p["source"],
),
wrapper_code="""
{result, ...}:
result.inputs.location
''',
)
assert isinstance(location, str)
""",
)
assert isinstance(location, str)
@pytest.mark.parametrize("p", projects)
def test_serializedRawObjects(p):
serializedRawObjects = nix_ffi.eval(
f"translatorsBySubsystem.{p['subsystem']}.{p['translator']}.finalTranslate",
params=dict(
project=p['project'],
source=p['source'],
),
wrapper_code = '''
serializedRawObjects = nix_ffi.eval(
f"translatorsBySubsystem.{p['subsystem']}.{p['translator']}.finalTranslate",
params=dict(
project=p["project"],
source=p["source"],
),
wrapper_code="""
{result, lib, ...}:
let
len = lib.length result.inputs.serializedRawObjects;
@ -149,70 +161,74 @@ def test_serializedRawObjects(p):
# for performance reasons check only first/last 10 items of the list
(lib.sublist 0 10 result.inputs.serializedRawObjects)
++ (lib.sublist (lib.max (len - 10) 0) len result.inputs.serializedRawObjects)
''',
)
assert isinstance(serializedRawObjects, list)
assert len(serializedRawObjects) > 0
for raw_obj in serializedRawObjects:
assert isinstance(raw_obj, dict)
""",
)
assert isinstance(serializedRawObjects, list)
assert len(serializedRawObjects) > 0
for raw_obj in serializedRawObjects:
assert isinstance(raw_obj, dict)
@pytest.mark.parametrize("p", projects)
def test_subsystemName(p):
subsystemName = nix_ffi.eval(
f"translatorsBySubsystem.{p['subsystem']}.{p['translator']}.finalTranslate",
params=dict(
project=p['project'],
source=p['source'],
),
wrapper_code = '''
subsystemName = nix_ffi.eval(
f"translatorsBySubsystem.{p['subsystem']}.{p['translator']}.finalTranslate",
params=dict(
project=p["project"],
source=p["source"],
),
wrapper_code="""
{result, ...}:
result.inputs.subsystemName
''',
)
assert isinstance(subsystemName, str)
assert len(subsystemName) > 0
""",
)
assert isinstance(subsystemName, str)
assert len(subsystemName) > 0
@pytest.mark.parametrize("p", projects)
def test_subsystemAttrs(p):
subsystemAttrs = nix_ffi.eval(
f"translatorsBySubsystem.{p['subsystem']}.{p['translator']}.finalTranslate",
params=dict(
project=p['project'],
source=p['source'],
),
wrapper_code = '''
subsystemAttrs = nix_ffi.eval(
f"translatorsBySubsystem.{p['subsystem']}.{p['translator']}.finalTranslate",
params=dict(
project=p["project"],
source=p["source"],
),
wrapper_code="""
{result, ...}:
builtins.trace result.inputs.subsystemAttrs
result.inputs.subsystemAttrs
''',
)
assert isinstance(subsystemAttrs, dict)
""",
)
assert isinstance(subsystemAttrs, dict)
@pytest.mark.parametrize("p", projects)
def test_translatorName(p):
translatorName = nix_ffi.eval(
f"translatorsBySubsystem.{p['subsystem']}.{p['translator']}.finalTranslate",
params=dict(
project=p['project'],
source=p['source'],
),
wrapper_code = '''
translatorName = nix_ffi.eval(
f"translatorsBySubsystem.{p['subsystem']}.{p['translator']}.finalTranslate",
params=dict(
project=p["project"],
source=p["source"],
),
wrapper_code="""
{result, ...}:
result.inputs.translatorName
''',
)
assert isinstance(translatorName, str)
assert len(translatorName) > 0
""",
)
assert isinstance(translatorName, str)
assert len(translatorName) > 0
@pytest.mark.parametrize("p", projects)
def test_extractors(p):
finalObjects = nix_ffi.eval(
f"translatorsBySubsystem.{p['subsystem']}.{p['translator']}.finalTranslate",
params=dict(
project=p['project'],
source=p['source'],
),
wrapper_code = '''
finalObjects = nix_ffi.eval(
f"translatorsBySubsystem.{p['subsystem']}.{p['translator']}.finalTranslate",
params=dict(
project=p["project"],
source=p["source"],
),
wrapper_code="""
{result, dlib, ...}:
let
l = builtins;
@ -228,25 +244,30 @@ def test_extractors(p):
s.mkRelevantFinalObjects exportedFinalObjects allDependencies;
in
relevantFinalObjects ++ (inputs.extraObjects or [])
''',
)
assert isinstance(finalObjects, list)
assert len(finalObjects) > 0
for finalObj in finalObjects:
assert (set(finalObj.keys()) - {'rawObj', 'key'}) == \
{'name', 'version', 'sourceSpec', 'dependencies'}
check_format_dependencies(finalObj['dependencies'])
check_format_sourceSpec(finalObj['sourceSpec'])
""",
)
assert isinstance(finalObjects, list)
assert len(finalObjects) > 0
for finalObj in finalObjects:
assert (set(finalObj.keys()) - {"rawObj", "key"}) == {
"name",
"version",
"sourceSpec",
"dependencies",
}
check_format_dependencies(finalObj["dependencies"])
check_format_sourceSpec(finalObj["sourceSpec"])
@pytest.mark.parametrize("p", projects)
def test_keys(p):
objectsByKey = nix_ffi.eval(
f"translatorsBySubsystem.{p['subsystem']}.{p['translator']}.finalTranslate",
params=dict(
project=p['project'],
source=p['source'],
),
wrapper_code = '''
objectsByKey = nix_ffi.eval(
f"translatorsBySubsystem.{p['subsystem']}.{p['translator']}.finalTranslate",
params=dict(
project=p["project"],
source=p["source"],
),
wrapper_code="""
{result, dlib, ...}:
let
l = builtins;
@ -273,12 +294,16 @@ def test_keys(p):
inputs.keys;
in
objectsByKey
''',
)
assert isinstance(objectsByKey, dict)
for key_name, objects in objectsByKey.items():
for finalObj in objects.values():
assert set(finalObj.keys()) - {'rawObj', 'key'} == \
{'name', 'version', 'sourceSpec', 'dependencies'}
check_format_dependencies(finalObj['dependencies'])
check_format_sourceSpec(finalObj['sourceSpec'])
""",
)
assert isinstance(objectsByKey, dict)
for key_name, objects in objectsByKey.items():
for finalObj in objects.values():
assert set(finalObj.keys()) - {"rawObj", "key"} == {
"name",
"version",
"sourceSpec",
"dependencies",
}
check_format_dependencies(finalObj["dependencies"])
check_format_sourceSpec(finalObj["sourceSpec"])

View File

@ -1,75 +1,78 @@
import pytest
import nix_ffi
@pytest.mark.parametrize("shortcut, expected", [
(
'https://foo',
dict (
type = "http",
url = "https://foo",
),
),
(
'http://foo/bar',
dict (
type = "http",
url = "http://foo/bar",
),
),
(
'github:owner/repo/v1.2.3',
dict (
type = "github",
owner = "owner",
repo = "repo",
rev = "v1.2.3",
),
),
# with arguments
(
'git+ssh://github.com/owner/repo?rev=refs/heads/v1.2.3&dir=sub/dir',
dict (
type = "git",
url = "ssh://github.com/owner/repo",
rev = "refs/heads/v1.2.3",
dir = "sub/dir",
),
),
(
'http://foo/bar?kwarg1=foo&dir=sub/dir',
dict (
type = "http",
url = "http://foo/bar?kwarg1=foo",
dir = "sub/dir",
),
),
(
'github:owner/repo/v1.2.3?kwarg1=foo&dir=sub/dir',
dict (
type = "github",
owner = "owner",
repo = "repo",
rev = "v1.2.3",
kwarg1 = "foo",
dir = "sub/dir",
),
),
(
'github:photoview/photoview/master?dir=lol',
dict (
type = "github",
owner = "photoview",
repo = "photoview",
rev = "master",
dir = "lol",
),
),
])
@pytest.mark.parametrize(
"shortcut, expected",
[
(
"https://foo",
dict(
type="http",
url="https://foo",
),
),
(
"http://foo/bar",
dict(
type="http",
url="http://foo/bar",
),
),
(
"github:owner/repo/v1.2.3",
dict(
type="github",
owner="owner",
repo="repo",
rev="v1.2.3",
),
),
# with arguments
(
"git+ssh://github.com/owner/repo?rev=refs/heads/v1.2.3&dir=sub/dir",
dict(
type="git",
url="ssh://github.com/owner/repo",
rev="refs/heads/v1.2.3",
dir="sub/dir",
),
),
(
"http://foo/bar?kwarg1=foo&dir=sub/dir",
dict(
type="http",
url="http://foo/bar?kwarg1=foo",
dir="sub/dir",
),
),
(
"github:owner/repo/v1.2.3?kwarg1=foo&dir=sub/dir",
dict(
type="github",
owner="owner",
repo="repo",
rev="v1.2.3",
kwarg1="foo",
dir="sub/dir",
),
),
(
"github:photoview/photoview/master?dir=lol",
dict(
type="github",
owner="photoview",
repo="photoview",
rev="master",
dir="lol",
),
),
],
)
def test_translateShortcut(shortcut, expected):
result = nix_ffi.callNixFunction(
'functions.fetchers.translateShortcut',
shortcut=shortcut,
computeHash=False,
)
assert result == expected
result = nix_ffi.callNixFunction(
"functions.fetchers.translateShortcut",
shortcut=shortcut,
computeHash=False,
)
assert result == expected

View File

@ -1,9 +1,13 @@
import pytest
import nix_ffi
@pytest.mark.parametrize("expected, versions", [
('3', [ '2', '3', '1' ]),
])
@pytest.mark.parametrize(
"expected, versions",
[
("3", ["2", "3", "1"]),
],
)
def test_latestVersion(expected, versions):
result = nix_ffi.callNixFunction('dlib.latestVersion', versions=versions)
result = nix_ffi.callNixFunction("dlib.latestVersion", versions=versions)
assert result == expected

View File

@ -6,3 +6,7 @@ excludes = [
"src/templates/translators/*",
"src/modules/_template/*",
]
[formatter.py]
command = "black"
includes = ["*.py"]