#!/usr/bin/env python # License: GPL v3 Copyright: 2016, Kovid Goyal import fcntl import io import os import select import shlex import signal import struct import sys import termios import time from contextlib import contextmanager from pty import CHILD, STDIN_FILENO, STDOUT_FILENO, fork from unittest import TestCase from kitty.config import finalize_keys, finalize_mouse_mappings from kitty.fast_data_types import Cursor, HistoryBuf, LineBuf, Screen, get_options, parse_bytes, set_options from kitty.options.parse import merge_result_dicts from kitty.options.types import Options, defaults from kitty.types import MouseEvent from kitty.utils import read_screen_size from kitty.window import process_remote_print, process_title_from_child class Callbacks: def __init__(self, pty=None) -> None: self.clear() self.pty = pty self.ftc = None self.set_pointer_shape = lambda data: None def write(self, data) -> None: self.wtcbuf += data def title_changed(self, data, is_base64=False) -> None: self.titlebuf.append(process_title_from_child(data, is_base64)) def icon_changed(self, data) -> None: self.iconbuf += data def set_dynamic_color(self, code, data) -> None: if code == 22: self.set_pointer_shape(data) else: self.colorbuf += data or '' def set_color_table_color(self, code, data) -> None: self.ctbuf += '' def color_profile_popped(self, x) -> None: pass def cmd_output_marking(self, is_start: bool) -> None: pass def request_capabilities(self, q) -> None: from kitty.terminfo import get_capabilities for c in get_capabilities(q, None): self.write(c.encode('ascii')) def use_utf8(self, on) -> None: self.iutf8 = on def desktop_notify(self, osc_code: int, raw_data: str) -> None: self.notifications.append((osc_code, raw_data)) def open_url(self, url: str, hyperlink_id: int) -> None: self.open_urls.append((url, hyperlink_id)) def clipboard_control(self, data: str, is_partial: bool = False) -> None: self.cc_buf.append((data, is_partial)) def clear(self) -> None: self.wtcbuf = b'' self.iconbuf = self.colorbuf = self.ctbuf = '' self.titlebuf = [] self.iutf8 = True self.notifications = [] self.open_urls = [] self.cc_buf = [] self.bell_count = 0 self.clone_cmds = [] self.current_clone_data = '' def on_bell(self) -> None: self.bell_count += 1 def on_activity_since_last_focus(self) -> None: pass def on_mouse_event(self, event): ev = MouseEvent(**event) opts = get_options() action_def = opts.mousemap.get(ev) if not action_def: return False self.current_mouse_button = ev.button for action in opts.alias_map.resolve_aliases(action_def, 'mouse_map'): getattr(self, action.func)(*action.args) self.current_mouse_button = 0 return True def handle_remote_print(self, msg): text = process_remote_print(msg) print(text, file=sys.__stdout__, end='', flush=True) def handle_remote_clone(self, msg): if not msg: if self.current_clone_data: cdata, self.current_clone_data = self.current_clone_data, '' from kitty.launch import CloneCmd self.clone_cmds.append(CloneCmd(cdata)) self.current_clone_data = '' return num, rest = msg.split(':', 1) if num == '0' or len(self.current_clone_data) > 1024 * 1024: self.current_clone_data = '' self.current_clone_data += rest def handle_remote_ssh(self, msg): from kittens.ssh.utils import get_ssh_data if self.pty: for line in get_ssh_data(msg, "testing"): self.pty.write_to_child(line) def handle_remote_echo(self, msg): from base64 import standard_b64decode if self.pty: data = standard_b64decode(msg) self.pty.write_to_child(data) def file_transmission(self, data): if self.ftc: self.ftc.handle_serialized_command(data) def filled_line_buf(ynum=5, xnum=5, cursor=Cursor()): ans = LineBuf(ynum, xnum) cursor.x = 0 for i in range(ynum): t = (f'{i}') * xnum ans.line(i).set_text(t, 0, xnum, cursor) return ans def filled_cursor(): ans = Cursor() ans.bold = ans.italic = ans.reverse = ans.strikethrough = ans.dim = True ans.fg = 0x101 ans.bg = 0x201 ans.decoration_fg = 0x301 return ans def filled_history_buf(ynum=5, xnum=5, cursor=Cursor()): lb = filled_line_buf(ynum, xnum, cursor) ans = HistoryBuf(ynum, xnum) for i in range(ynum): ans.push(lb.line(i)) return ans class BaseTest(TestCase): ae = TestCase.assertEqual maxDiff = 2048 is_ci = os.environ.get('CI') == 'true' def tearDown(self): set_options(None) def set_options(self, options=None): final_options = {'scrollback_pager_history_size': 1024, 'click_interval': 0.5} if options: final_options.update(options) options = Options(merge_result_dicts(defaults._asdict(), final_options)) finalize_keys(options, {}) finalize_mouse_mappings(options, {}) set_options(options) return options def cmd_to_run_python_code(self, code): from kitty.constants import kitty_exe return [kitty_exe(), '+runpy', code] def create_screen(self, cols=5, lines=5, scrollback=5, cell_width=10, cell_height=20, options=None): self.set_options(options) c = Callbacks() s = Screen(c, lines, cols, scrollback, cell_width, cell_height, 0, c) return s def create_pty( self, argv=None, cols=80, lines=100, scrollback=100, cell_width=10, cell_height=20, options=None, cwd=None, env=None, stdin_fd=None, stdout_fd=None ): self.set_options(options) return PTY(argv, lines, cols, scrollback, cell_width, cell_height, cwd, env, stdin_fd=stdin_fd, stdout_fd=stdout_fd) def assertEqualAttributes(self, c1, c2): x1, y1, c1.x, c1.y = c1.x, c1.y, 0, 0 x2, y2, c2.x, c2.y = c2.x, c2.y, 0, 0 try: self.assertEqual(c1, c2) finally: c1.x, c1.y, c2.x, c2.y = x1, y1, x2, y2 debug_stdout = debug_stderr = -1 @contextmanager def forwardable_stdio(): global debug_stderr, debug_stdout debug_stdout = fd = os.dup(sys.stdout.fileno()) os.set_inheritable(fd, True) debug_stderr = fd = os.dup(sys.stderr.fileno()) os.set_inheritable(fd, True) try: yield finally: os.close(debug_stderr) os.close(debug_stdout) debug_stderr = debug_stdout = -1 class PTY: def __init__( self, argv=None, rows=25, columns=80, scrollback=100, cell_width=10, cell_height=20, cwd=None, env=None, stdin_fd=None, stdout_fd=None ): self.is_child = False if isinstance(argv, str): argv = shlex.split(argv) self.write_buf = b'' if argv is None: from kitty.child import openpty self.master_fd, self.slave_fd = openpty() else: self.child_pid, self.master_fd = fork() self.is_child = self.child_pid == CHILD self.child_waited_for = False if self.is_child: while read_screen_size().width != columns * cell_width: time.sleep(0.01) if cwd: os.chdir(cwd) if stdin_fd is not None: os.dup2(stdin_fd, STDIN_FILENO) os.close(stdin_fd) if stdout_fd is not None: os.dup2(stdout_fd, STDOUT_FILENO) os.close(stdout_fd) signal.pthread_sigmask(signal.SIG_SETMASK, ()) env = os.environ if env is None else env if debug_stdout > -1: env['KITTY_STDIO_FORWARDED'] = str(debug_stdout) os.execvpe(argv[0], argv, env) if stdin_fd is not None: os.close(stdin_fd) if stdout_fd is not None: os.close(stdout_fd) os.set_blocking(self.master_fd, False) self.cell_width = cell_width self.cell_height = cell_height self.set_window_size(rows=rows, columns=columns) self.callbacks = Callbacks(self) self.screen = Screen(self.callbacks, rows, columns, scrollback, cell_width, cell_height, 0, self.callbacks) self.received_bytes = b'' def turn_off_echo(self): s = termios.tcgetattr(self.master_fd) s[3] &= ~termios.ECHO termios.tcsetattr(self.master_fd, termios.TCSANOW, s) def is_echo_on(self): s = termios.tcgetattr(self.master_fd) return True if s[3] & termios.ECHO else False def __del__(self): if not self.is_child: if hasattr(self, 'master_fd'): os.close(self.master_fd) del self.master_fd if hasattr(self, 'slave_fd'): os.close(self.slave_fd) del self.slave_fd if self.child_pid > 0 and not self.child_waited_for: os.waitpid(self.child_pid, 0) self.child_waited_for = True def write_to_child(self, data, flush=False): if isinstance(data, str): data = data.encode('utf-8') self.write_buf += data if flush: self.process_input_from_child(0) def send_cmd_to_child(self, cmd, flush=False): self.write_to_child(cmd + '\r', flush=flush) def process_input_from_child(self, timeout=10): rd, wd, err = select.select([self.master_fd], [self.master_fd] if self.write_buf else [], [], timeout) while wd: try: n = os.write(self.master_fd, self.write_buf) except (BlockingIOError, OSError): n = 0 if not n: break self.write_buf = self.write_buf[n:] bytes_read = 0 while rd: try: data = os.read(self.master_fd, io.DEFAULT_BUFFER_SIZE) except (BlockingIOError, OSError): data = b'' if not data: break bytes_read += len(data) self.received_bytes += data parse_bytes(self.screen, data) return bytes_read def wait_till(self, q, timeout=10): end_time = time.monotonic() + timeout while not q() and time.monotonic() <= end_time: self.process_input_from_child(timeout=max(0, end_time - time.monotonic())) if not q(): raise TimeoutError(f'The condition was not met. Screen contents: \n {repr(self.screen_contents())}') def wait_till_child_exits(self, timeout=30 if BaseTest.is_ci else 10, require_exit_code=None): end_time = time.monotonic() + timeout while time.monotonic() <= end_time: si_pid, status = os.waitpid(self.child_pid, os.WNOHANG) if si_pid == self.child_pid and os.WIFEXITED(status): ec = os.waitstatus_to_exitcode(status) if hasattr(os, 'waitstatus_to_exitcode') else require_exit_code self.child_waited_for = True if require_exit_code is not None and ec != require_exit_code: raise AssertionError( f'Child exited with exit status: {status} code: {ec} != {require_exit_code}.' f' Screen contents:\n{self.screen_contents()}') return status self.process_input_from_child(timeout=0.02) raise AssertionError(f'Child did not exit in {timeout} seconds. Screen contents:\n{self.screen_contents()}') def set_window_size(self, rows=25, columns=80, send_signal=True): if hasattr(self, 'screen'): self.screen.resize(rows, columns) if send_signal: x_pixels = columns * self.cell_width y_pixels = rows * self.cell_height s = struct.pack('HHHH', rows, columns, x_pixels, y_pixels) fcntl.ioctl(self.master_fd, termios.TIOCSWINSZ, s) def screen_contents(self): lines = [] for i in range(self.screen.lines): x = str(self.screen.line(i)) if x: lines.append(x) return '\n'.join(lines) def last_cmd_output(self, as_ansi=False, add_wrap_markers=False): from kitty.window import cmd_output return cmd_output(self.screen, as_ansi=as_ansi, add_wrap_markers=add_wrap_markers)