sapling/eden/scm/tests/dummyssh3.py

141 lines
3.5 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
import os
import shlex
import subprocess
import sys
import threading
from typing import Optional, TypeVar
_T = TypeVar("_T")
def none_throws(optional: Optional[_T]) -> _T:
assert optional is not None, "Unexpected None"
return optional
os.chdir(none_throws(os.getenv("TESTTMP")))
def parse(cmd):
"""
matches hg-ssh-wrapper
"""
try:
return shlex.split(cmd)
except ValueError as e:
print('Illegal command "%s": %s\n' % (cmd, e), file=sys.stderr)
sys.exit(255)
def parse_repo_path(path):
"""
matches hg-ssh-wrapper
"""
path = path.split("?")
if len(path) == 1:
repo = path[0]
marker = None
elif len(path) == 2:
repo = path[0]
marker = path[1]
else:
print("Illegal repo name: %s\n" % "?".join(path), file=sys.stderr)
sys.exit(255)
return repo, marker
# Skipping SSH options
host_index = 1
while host_index < len(sys.argv) and sys.argv[host_index].startswith("-"):
host_index += 1
if sys.argv[host_index] != "user@dummy":
sys.exit(-1)
os.environ["SSH_CLIENT"] = "%s 1 2" % os.environ.get(
"SSH_IP_OVERRIDE", os.environ.get("LOCALIP", "127.0.0.1")
)
log = open("dummylog", "ab")
log.write(b"Got arguments")
for i, arg in enumerate(sys.argv[1:]):
log.write(b" %d:%s" % (i + 1, arg.encode("utf-8")))
log.write(b"\n")
log.close()
hgcmd = sys.argv[host_index + 1]
if os.name == "nt":
# hack to make simple unix single quote quoting work on windows
hgcmd = hgcmd.replace("'", '"')
cmdargv = parse(hgcmd)
if cmdargv[:2] == ["hg", "-R"] and cmdargv[3:] == ["serve", "--stdio"]:
path, marker = parse_repo_path(cmdargv[2])
if marker == "read_copy":
path = path + "_copy"
cmdargv[2] = path
hgcmd = subprocess.list2cmdline(cmdargv)
if "hgcli" in hgcmd:
certdir = os.environ.get("HGTEST_CERTDIR") or os.environ.get("TEST_CERTS")
if certdir is None:
raise ValueError("No cert dir")
certdir = none_throws(certdir)
cert = os.path.join(certdir, "localhost.crt")
capem = os.path.join(certdir, "root-ca.crt")
privatekey = os.path.join(certdir, "localhost.key")
localip = os.environ.get("LOCALIP", "127.0.0.1")
if ":" in localip:
# this is ipv6, put it in brackets
localip = "[" + localip + "]"
hgcmd += (
(" --mononoke-path %s:" % localip)
+ none_throws(os.getenv("MONONOKE_SOCKET"))
+ (
" --cert %s --ca-pem %s --private-key %s --common-name localhost"
% (cert, capem, privatekey)
)
)
mock_username = os.environ.get("MOCK_USERNAME")
if mock_username:
hgcmd += " --mock-username '{}'".format(mock_username)
client_debug = os.environ.get("CLIENT_DEBUG")
if client_debug == "true":
hgcmd += " --client-debug"
if os.environ.get("DUMMYSSH_STABLE_ORDER"):
# Buffer all stderr outputs until the end of connection. This reduces test
# flakiness where stderr and stdout output order is nondeterministic.
p = subprocess.Popen(hgcmd, shell=True, stderr=subprocess.PIPE)
errbuf = [b""]
def readstderr():
while True:
ch = p.stderr.read(1)
if not ch:
break
errbuf[0] += ch
t = threading.Thread(target=readstderr)
t.start()
p.wait()
t.join()
if sys.version_info[0] >= 3:
sys.stderr.buffer.write(errbuf[0])
else:
sys.stderr.write(errbuf[0])
sys.stderr.flush()
sys.exit(p.returncode)
else:
r = os.system(hgcmd)
sys.exit(bool(r))