#!/usr/bin/env python from __future__ import print_function import os, sys, re import argparse, time import signal, atexit from subprocess import Popen, STDOUT, PIPE from select import select # Pseudo-TTY and terminal manipulation import pty, array, fcntl, termios IS_PY_3 = sys.version_info[0] == 3 debug_file = None log_file = None def debug(data): if debug_file: debug_file.write(data) debug_file.flush() def log(data, end='\n'): if log_file: log_file.write(data + end) log_file.flush() print(data, end=end) sys.stdout.flush() # TODO: do we need to support '\n' too import platform if platform.system().find("CYGWIN_NT") >= 0: # TODO: this is weird, is this really right on Cygwin? sep = "\n\r\n" else: sep = "\r\n" rundir = None parser = argparse.ArgumentParser( description="Run a test file against a Mal implementation") parser.add_argument('--rundir', help="change to the directory before running tests") parser.add_argument('--start-timeout', default=10, type=int, help="default timeout for initial prompt") parser.add_argument('--test-timeout', default=20, type=int, help="default timeout for each individual test action") parser.add_argument('--pre-eval', default=None, type=str, help="Mal code to evaluate prior to running the test") parser.add_argument('--no-pty', action='store_true', help="Use direct pipes instead of pseudo-tty") parser.add_argument('--log-file', type=str, help="Write messages to the named file in addition the screen") parser.add_argument('--debug-file', type=str, help="Write all test interaction the named file") parser.add_argument('--hard', action='store_true', help="Turn soft tests following a ';>>> soft=True' into hard failures") # Control whether deferrable and optional tests are executed parser.add_argument('--deferrable', dest='deferrable', action='store_true', help="Enable deferrable tests that follow a ';>>> deferrable=True'") parser.add_argument('--no-deferrable', dest='deferrable', action='store_false', help="Disable deferrable tests that follow a ';>>> deferrable=True'") parser.set_defaults(deferrable=True) parser.add_argument('--optional', dest='optional', action='store_true', help="Enable optional tests that follow a ';>>> optional=True'") parser.add_argument('--no-optional', dest='optional', action='store_false', help="Disable optional tests that follow a ';>>> optional=True'") parser.set_defaults(optional=True) parser.add_argument('test_file', type=argparse.FileType('r'), help="a test file formatted as with mal test data") parser.add_argument('mal_cmd', nargs="*", help="Mal implementation command line. Use '--' to " "specify a Mal command line with dashed options.") class Runner(): def __init__(self, args, no_pty=False): #print "args: %s" % repr(args) self.no_pty = no_pty # Cleanup child process on exit atexit.register(self.cleanup) self.p = None env = os.environ env['TERM'] = 'dumb' env['INPUTRC'] = '/dev/null' env['PERL_RL'] = 'false' if no_pty: self.p = Popen(args, bufsize=0, stdin=PIPE, stdout=PIPE, stderr=STDOUT, preexec_fn=os.setsid, env=env) self.stdin = self.p.stdin self.stdout = self.p.stdout else: # provide tty to get 'interactive' readline to work master, slave = pty.openpty() # Set terminal size large so that readline will not send # ANSI/VT escape codes when the lines are long. buf = array.array('h', [100, 200, 0, 0]) fcntl.ioctl(master, termios.TIOCSWINSZ, buf, True) self.p = Popen(args, bufsize=0, stdin=slave, stdout=slave, stderr=STDOUT, preexec_fn=os.setsid, env=env) # Now close slave so that we will get an exception from # read when the child exits early # http://stackoverflow.com/questions/11165521 os.close(slave) self.stdin = os.fdopen(master, 'r+b', 0) self.stdout = self.stdin #print "started" self.buf = "" self.last_prompt = "" def read_to_prompt(self, prompts, timeout): end_time = time.time() + timeout while time.time() < end_time: [outs,_,_] = select([self.stdout], [], [], 1) if self.stdout in outs: new_data = self.stdout.read(1) new_data = new_data.decode("utf-8") if IS_PY_3 else new_data #print("new_data: '%s'" % new_data) debug(new_data) # Perform newline cleanup if self.no_pty: self.buf += new_data.replace("\n", "\r\n") else: self.buf += new_data self.buf = self.buf.replace("\r\r", "\r") # Remove ANSI codes generally #ansi_escape = re.compile(r'\x1B\[[0-?]*[ -/]*[@-~]') # Remove rustyline ANSI CSI codes: # - [6C - CR + cursor forward # - [6K - CR + erase in line ansi_escape = re.compile(r'\r\x1B\[[0-9]*[CK]') self.buf = ansi_escape.sub('', self.buf) for prompt in prompts: regexp = re.compile(prompt) match = regexp.search(self.buf) if match: end = match.end() buf = self.buf[0:end-len(prompt)] self.buf = self.buf[end:] self.last_prompt = prompt return buf return None def writeline(self, str): def _to_bytes(s): return bytes(s, "utf-8") if IS_PY_3 else s self.stdin.write(_to_bytes(str + "\n")) def cleanup(self): #print "cleaning up" if self.p: try: os.killpg(self.p.pid, signal.SIGTERM) except OSError: pass self.p = None class TestReader: def __init__(self, test_file): self.line_num = 0 self.data = test_file.read().split('\n') self.soft = False self.deferrable = False self.optional = False def next(self): self.msg = None self.form = None self.out = "" self.ret = None while self.data: self.line_num += 1 line = self.data.pop(0) if re.match(r"^\s*$", line): # blank line continue elif line[0:3] == ";;;": # ignore comment continue elif line[0:2] == ";;": # output comment self.msg = line[3:] return True elif line[0:5] == ";>>> ": # settings/commands settings = {} exec(line[5:], {}, settings) if 'soft' in settings: self.soft = settings['soft'] if 'deferrable' in settings and settings['deferrable']: self.deferrable = "\nSkipping deferrable and optional tests" return True if 'optional' in settings and settings['optional']: self.optional = "\nSkipping optional tests" return True continue elif line[0:1] == ";": # unexpected comment log("Test data error at line %d:\n%s" % (self.line_num, line)) return None self.form = line # the line is a form to send # Now find the output and return value while self.data: line = self.data[0] if line[0:3] == ";=>": self.ret = line[3:] self.line_num += 1 self.data.pop(0) break elif line[0:2] == "; ": self.out = self.out + line[2:] + sep self.line_num += 1 self.data.pop(0) else: self.ret = "*" break if self.ret: break return self.form args = parser.parse_args(sys.argv[1:]) # Workaround argparse issue with two '--' on command line if sys.argv.count('--') > 0: args.mal_cmd = sys.argv[sys.argv.index('--')+1:] if args.rundir: os.chdir(args.rundir) if args.log_file: log_file = open(args.log_file, "a") if args.debug_file: debug_file = open(args.debug_file, "a") r = Runner(args.mal_cmd, no_pty=args.no_pty) t = TestReader(args.test_file) def assert_prompt(runner, prompts, timeout): # Wait for the initial prompt header = runner.read_to_prompt(prompts, timeout=timeout) if not header == None: if header: log("Started with:\n%s" % header) else: log("Did not one of following prompt(s): %s" % repr(prompts)) log(" Got : %s" % repr(r.buf)) sys.exit(1) # Wait for the initial prompt try: assert_prompt(r, ['user> ', 'mal-user> '], args.start_timeout) except: _, exc, _ = sys.exc_info() log("\nException: %s" % repr(exc)) log("Output before exception:\n%s" % r.buf) sys.exit(1) # Send the pre-eval code if any if args.pre_eval: sys.stdout.write("RUNNING pre-eval: %s" % args.pre_eval) p.write(args.pre_eval) assert_prompt(args.test_timeout) test_cnt = 0 pass_cnt = 0 fail_cnt = 0 soft_fail_cnt = 0 failures = [] while t.next(): if args.deferrable == False and t.deferrable: log(t.deferrable) break if args.optional == False and t.optional: log(t.optional) break if t.msg != None: log(t.msg) continue if t.form == None: continue log("TEST: %s -> [%s,%s]" % (t.form, repr(t.out), t.ret), end='') # The repeated form is to get around an occasional OS X issue # where the form is repeated. # https://github.com/kanaka/mal/issues/30 expected = ["%s%s%s%s" % (t.form, sep, t.out, t.ret), "%s%s%s%s%s%s" % (t.form, sep, t.form, sep, t.out, t.ret)] r.writeline(t.form) try: test_cnt += 1 res = r.read_to_prompt(['\r\nuser> ', '\nuser> ', '\r\nmal-user> ', '\nmal-user> '], timeout=args.test_timeout) #print "%s,%s,%s" % (idx, repr(p.before), repr(p.after)) if t.ret == "*" or res in expected: log(" -> SUCCESS") pass_cnt += 1 else: if t.soft and not args.hard: log(" -> SOFT FAIL (line %d):" % t.line_num) soft_fail_cnt += 1 fail_type = "SOFT " else: log(" -> FAIL (line %d):" % t.line_num) fail_cnt += 1 fail_type = "" log(" Expected : %s" % repr(expected[0])) log(" Got : %s" % repr(res)) failed_test = """%sFAILED TEST (line %d): %s -> [%s,%s]: Expected : %s Got : %s""" % (fail_type, t.line_num, t.form, repr(t.out), t.ret, repr(expected[0]), repr(res)) failures.append(failed_test) except: _, exc, _ = sys.exc_info() log("\nException: %s" % repr(exc)) log("Output before exception:\n%s" % r.buf) sys.exit(1) if len(failures) > 0: log("\nFAILURES:") for f in failures: log(f) results = """ TEST RESULTS (for %s): %3d: soft failing tests %3d: failing tests %3d: passing tests %3d: total tests """ % (args.test_file.name, soft_fail_cnt, fail_cnt, pass_cnt, test_cnt) log(results) debug("\n") # add some separate to debug log if fail_cnt > 0: sys.exit(1) sys.exit(0)