chore(formatting): reformat v1 code - python files

This commit is contained in:
DavHau 2023-03-12 22:29:53 +07:00
parent d96e7234e3
commit 600a2cbecc
3 changed files with 93 additions and 65 deletions

View File

@ -8,7 +8,11 @@ import urllib.request
from pathlib import Path from pathlib import Path
import certifi import certifi
from packaging.utils import canonicalize_name, parse_sdist_filename, parse_wheel_filename from packaging.utils import (
canonicalize_name,
parse_sdist_filename,
parse_wheel_filename,
)
HOME = Path(os.getcwd()) HOME = Path(os.getcwd())
@ -17,10 +21,11 @@ PYTHON_BIN = os.getenv("pythonBin")
PYTHON_WITH_MITM_PROXY = os.getenv("pythonWithMitmproxy") PYTHON_WITH_MITM_PROXY = os.getenv("pythonWithMitmproxy")
FILTER_PYPI_RESPONSE_SCRIPTS = os.getenv("filterPypiResponsesScript") FILTER_PYPI_RESPONSE_SCRIPTS = os.getenv("filterPypiResponsesScript")
PIP_VERSION = os.getenv("pipVersion") PIP_VERSION = os.getenv("pipVersion")
PIP_FLAGS = os.getenv('pipFlags') PIP_FLAGS = os.getenv("pipFlags")
ONLY_BINARY_FLAGS = os.getenv('onlyBinaryFlags') ONLY_BINARY_FLAGS = os.getenv("onlyBinaryFlags")
REQUIREMENTS_LIST = os.getenv('requirementsList') REQUIREMENTS_LIST = os.getenv("requirementsList")
REQUIREMENTS_FLAGS = os.getenv('requirementsFlags') REQUIREMENTS_FLAGS = os.getenv("requirementsFlags")
def get_max_date(): def get_max_date():
try: try:
@ -41,23 +46,23 @@ def start_mitmproxy(port):
proc = subprocess.Popen( proc = subprocess.Popen(
[ [
f"{PYTHON_WITH_MITM_PROXY}/bin/mitmdump", f"{PYTHON_WITH_MITM_PROXY}/bin/mitmdump",
"--listen-port", str(port), "--listen-port",
"--ignore-hosts", ".*files.pythonhosted.org.*", str(port),
"--script", FILTER_PYPI_RESPONSE_SCRIPTS "--ignore-hosts",
".*files.pythonhosted.org.*",
"--script",
FILTER_PYPI_RESPONSE_SCRIPTS,
], ],
env = { env={"MAX_DATE": os.getenv("MAX_DATE"), "HOME": HOME},
"MAX_DATE": os.getenv('MAX_DATE'),
"HOME": HOME
}
) )
return proc return proc
def wait_for_proxy(proxy_port, cafile): def wait_for_proxy(proxy_port, cafile):
timeout = time.time() + 60 * 5 timeout = time.time() + 60 * 5
req = urllib.request.Request('https://pypi.org') req = urllib.request.Request("https://pypi.org")
req.set_proxy(f'127.0.0.1:{proxy_port}', 'http') req.set_proxy(f"127.0.0.1:{proxy_port}", "http")
req.set_proxy(f'127.0.0.1:{proxy_port}', 'https') req.set_proxy(f"127.0.0.1:{proxy_port}", "https")
context = ssl.create_default_context(cafile=cafile) context = ssl.create_default_context(cafile=cafile)
while time.time() < timeout: while time.time() < timeout:
@ -84,15 +89,16 @@ def generate_ca_bundle(path):
f.write(certifi_cacert) f.write(certifi_cacert)
return path return path
def create_venv(path): def create_venv(path):
subprocess.run([PYTHON_BIN, '-m', 'venv', path], check=True) subprocess.run([PYTHON_BIN, "-m", "venv", path], check=True)
def pip(venv_path, *args): def pip(venv_path, *args):
subprocess.run([f"{venv_path}/bin/pip", *args], check=True) subprocess.run([f"{venv_path}/bin/pip", *args], check=True)
if __name__ == '__main__': if __name__ == "__main__":
OUT.mkdir() OUT.mkdir()
dist_path = OUT / "dist" dist_path = OUT / "dist"
names_path = OUT / "names" names_path = OUT / "names"
@ -104,35 +110,44 @@ if __name__ == '__main__':
proxy = start_mitmproxy(proxy_port) proxy = start_mitmproxy(proxy_port)
venv_path = Path('.venv').absolute() venv_path = Path(".venv").absolute()
create_venv(venv_path) create_venv(venv_path)
pip(venv_path, 'install', '--upgrade', f'pip=={PIP_VERSION}') pip(venv_path, "install", "--upgrade", f"pip=={PIP_VERSION}")
cafile = generate_ca_bundle(HOME / ".ca-cert.pem") cafile = generate_ca_bundle(HOME / ".ca-cert.pem")
wait_for_proxy(proxy_port, cafile) wait_for_proxy(proxy_port, cafile)
optional_flags = [PIP_FLAGS, ONLY_BINARY_FLAGS, REQUIREMENTS_LIST, REQUIREMENTS_FLAGS] optional_flags = [
PIP_FLAGS,
ONLY_BINARY_FLAGS,
REQUIREMENTS_LIST,
REQUIREMENTS_FLAGS,
]
optional_flags = " ".join(filter(None, optional_flags)).split(" ") optional_flags = " ".join(filter(None, optional_flags)).split(" ")
pip( pip(
venv_path, venv_path,
'download', "download",
'--no-cache', "--no-cache",
'--dest', dist_path, "--dest",
'--progress-bar', 'off', dist_path,
'--proxy', f'https://localhost:{proxy_port}', "--progress-bar",
'--cert', cafile, "off",
*optional_flags "--proxy",
f"https://localhost:{proxy_port}",
"--cert",
cafile,
*optional_flags,
) )
proxy.kill() proxy.kill()
for dist_file in dist_path.iterdir(): for dist_file in dist_path.iterdir():
if dist_file.suffix == '.whl': if dist_file.suffix == ".whl":
name = parse_wheel_filename(dist_file.name)[0] name = parse_wheel_filename(dist_file.name)[0]
else: else:
name = parse_sdist_filename(dist_file.name)[0] name = parse_sdist_filename(dist_file.name)[0]
pname = canonicalize_name(name) pname = canonicalize_name(name)
name_path = names_path / pname name_path = names_path / pname
print(f'creating link {name_path} -> {dist_file}') print(f"creating link {name_path} -> {dist_file}")
name_path.mkdir() name_path.mkdir()
(name_path / dist_file.name).symlink_to(f"../../dist/{dist_file.name}") (name_path / dist_file.name).symlink_to(f"../../dist/{dist_file.name}")

View File

@ -23,8 +23,10 @@ from mitmproxy import http
Query the pypi json api to get timestamps for all release files of the given pname. Query the pypi json api to get timestamps for all release files of the given pname.
return all file names which are newer than the given timestamp return all file names which are newer than the given timestamp
""" """
def get_files_to_hide(pname, max_ts): def get_files_to_hide(pname, max_ts):
ca_file = Path(os.getenv('HOME')) / ".ca-cert.pem" ca_file = Path(os.getenv("HOME")) / ".ca-cert.pem"
context = ssl.create_default_context(cafile=ca_file) context = ssl.create_default_context(cafile=ca_file)
if not ca_file.exists(): if not ca_file.exists():
print("mitmproxy ca not found") print("mitmproxy ca not found")
@ -33,18 +35,18 @@ def get_files_to_hide(pname, max_ts):
# query the api # query the api
url = f"https://pypi.org/pypi/{pname}/json" url = f"https://pypi.org/pypi/{pname}/json"
req = Request(url) req = Request(url)
req.add_header('Accept-Encoding', 'gzip') req.add_header("Accept-Encoding", "gzip")
with urlopen(req, context=context) as response: with urlopen(req, context=context) as response:
content = gzip.decompress(response.read()) content = gzip.decompress(response.read())
resp = json.loads(content) resp = json.loads(content)
# collect files to hide # collect files to hide
files = set() files = set()
for ver, releases in resp['releases'].items(): for ver, releases in resp["releases"].items():
for release in releases: for release in releases:
ts = dateutil.parser.parse(release['upload_time']).timestamp() ts = dateutil.parser.parse(release["upload_time"]).timestamp()
if ts > max_ts: if ts > max_ts:
files.add(release['filename']) files.add(release["filename"])
return files return files
@ -80,14 +82,16 @@ Response format:
} }
} }
""" """
def response(flow: http.HTTPFlow) -> None: def response(flow: http.HTTPFlow) -> None:
if not "/simple/" in flow.request.url: if not "/simple/" in flow.request.url:
return return
pname = flow.request.url.strip('/').split('/')[-1] pname = flow.request.url.strip("/").split("/")[-1]
badFiles = get_files_to_hide(pname, max_ts) badFiles = get_files_to_hide(pname, max_ts)
keepFile = lambda file: file['filename'] not in badFiles keepFile = lambda file: file["filename"] not in badFiles
data = json.loads(flow.response.text) data = json.loads(flow.response.text)
if badFiles: if badFiles:
print(f"removing the following files form the API response:\n {badFiles}") print(f"removing the following files form the API response:\n {badFiles}")
data['files'] = list(filter(keepFile, data['files'])) data["files"] = list(filter(keepFile, data["files"]))
flow.response.text = json.dumps(data) flow.response.text = json.dumps(data)

View File

@ -1,6 +1,6 @@
#!/usr/bin/env nix-shell #!/usr/bin/env nix-shell
#! nix-shell -i python3 -p python3 python3Packages.pkginfo python3Packages.packaging #! nix-shell -i python3 -p python3 python3Packages.pkginfo python3Packages.packaging
''' """
Given a directory of python source distributions (.tar.gz) and wheels, Given a directory of python source distributions (.tar.gz) and wheels,
return a JSON representation of their dependency tree. return a JSON representation of their dependency tree.
@ -24,7 +24,7 @@ dependency declarations.
The output is a list of tuples. First element in each tuple is the package name, The output is a list of tuples. First element in each tuple is the package name,
second a list of dependencies. Output is sorted by the number of dependencies, second a list of dependencies. Output is sorted by the number of dependencies,
so that leafs of the dependency tree come first, the package to install last. so that leafs of the dependency tree come first, the package to install last.
''' """
import sys import sys
import tarfile import tarfile
@ -33,11 +33,15 @@ from pathlib import Path
from pkginfo import SDist, Wheel from pkginfo import SDist, Wheel
from packaging.requirements import Requirement from packaging.requirements import Requirement
from packaging.utils import parse_sdist_filename, parse_wheel_filename, canonicalize_name from packaging.utils import (
parse_sdist_filename,
parse_wheel_filename,
canonicalize_name,
)
def _is_source_dist(pkg_file): def _is_source_dist(pkg_file):
return pkg_file.suffixes[-2:] == ['.tar', '.gz'] return pkg_file.suffixes[-2:] == [".tar", ".gz"]
def _get_name_version(pkg_file): def _get_name_version(pkg_file):
@ -50,7 +54,7 @@ def _get_name_version(pkg_file):
def get_pkg_info(pkg_file): def get_pkg_info(pkg_file):
try: try:
if pkg_file.suffix == '.whl': if pkg_file.suffix == ".whl":
return Wheel(str(pkg_file)) return Wheel(str(pkg_file))
elif _is_source_dist(pkg_file): elif _is_source_dist(pkg_file):
return SDist(str(pkg_file)) return SDist(str(pkg_file))
@ -63,7 +67,7 @@ def get_pkg_info(pkg_file):
def _is_required_dependency(requirement): def _is_required_dependency(requirement):
# We set the extra field to an empty string to effectively ignore all optional # We set the extra field to an empty string to effectively ignore all optional
# dependencies for now. # dependencies for now.
return not requirement.marker or requirement.marker.evaluate({'extra': ""}) return not requirement.marker or requirement.marker.evaluate({"extra": ""})
def parse_requirements_txt(pkg_file): def parse_requirements_txt(pkg_file):
@ -72,7 +76,8 @@ def parse_requirements_txt(pkg_file):
requirements = [ requirements = [
Requirement(req) Requirement(req)
for req in requirements_txt.split("\n") for req in requirements_txt.split("\n")
if req and not req.startswith("#")] if req and not req.startswith("#")
]
return requirements return requirements
@ -80,18 +85,18 @@ def read_requirements_txt(source_dist_file):
name, version = parse_sdist_filename(source_dist_file.name) name, version = parse_sdist_filename(source_dist_file.name)
with tarfile.open(source_dist_file) as tar: with tarfile.open(source_dist_file) as tar:
try: try:
with tar.extractfile(f'{name}-{version}/requirements.txt') as f: with tar.extractfile(f"{name}-{version}/requirements.txt") as f:
return f.read().decode('utf-8') return f.read().decode("utf-8")
except KeyError as e: except KeyError as e:
return return
def usage(): def usage():
print(f'{sys.argv[0]} <pkgs-directory>') print(f"{sys.argv[0]} <pkgs-directory>")
sys.exit(1) sys.exit(1)
if __name__ == '__main__': if __name__ == "__main__":
if len(sys.argv) != 2: if len(sys.argv) != 2:
usage() usage()
pkgs_path = Path(sys.argv[1]) pkgs_path = Path(sys.argv[1])
@ -113,8 +118,12 @@ if __name__ == '__main__':
requirements = parse_requirements_txt(pkg_file) requirements = parse_requirements_txt(pkg_file)
requirements = filter(_is_required_dependency, requirements) requirements = filter(_is_required_dependency, requirements)
dependencies.append({'name': name, 'dependencies': [canonicalize_name(req.name) for req in requirements]}) dependencies.append(
{
"name": name,
"dependencies": [canonicalize_name(req.name) for req in requirements],
}
)
dependencies = sorted(dependencies, key=lambda d: len(d["dependencies"])) dependencies = sorted(dependencies, key=lambda d: len(d["dependencies"]))
print(json.dumps(dependencies, indent=2)) print(json.dumps(dependencies, indent=2))