kitty/kitty_tests/__init__.py

296 lines
9.6 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
2016-10-16 18:06:27 +03:00
# License: GPL v3 Copyright: 2016, Kovid Goyal <kovid at kovidgoyal.net>
import fcntl
import io
2021-02-05 07:12:01 +03:00
import os
import select
import shlex
import struct
2022-02-22 09:17:44 +03:00
import sys
import termios
import time
from pty import CHILD, fork
2016-10-16 18:06:27 +03:00
from unittest import TestCase
2021-12-23 07:43:16 +03:00
from kitty.config import finalize_keys, finalize_mouse_mappings
from kitty.fast_data_types import (
Cursor, HistoryBuf, LineBuf, Screen, get_options, parse_bytes, set_options
)
from kitty.options.parse import merge_result_dicts
2021-12-23 07:43:16 +03:00
from kitty.options.types import Options, defaults
from kitty.types import MouseEvent
2022-02-24 07:35:13 +03:00
from kitty.utils import read_screen_size
from kitty.window import process_remote_print, process_title_from_child
class Callbacks:
def __init__(self, pty=None) -> None:
self.clear()
self.pty = pty
def write(self, data) -> None:
self.wtcbuf += data
def title_changed(self, data, is_base64=False) -> None:
self.titlebuf.append(process_title_from_child(data, is_base64))
def icon_changed(self, data) -> None:
self.iconbuf += data
def set_dynamic_color(self, code, data) -> None:
self.colorbuf += data or ''
def set_color_table_color(self, code, data) -> None:
self.ctbuf += ''
2022-05-13 15:12:00 +03:00
def color_profile_popped(self, x) -> None:
pass
def request_capabilities(self, q) -> None:
from kitty.terminfo import get_capabilities
for c in get_capabilities(q, None):
self.write(c.encode('ascii'))
def use_utf8(self, on) -> None:
2017-04-28 07:11:47 +03:00
self.iutf8 = on
def desktop_notify(self, osc_code: int, raw_data: str) -> None:
self.notifications.append((osc_code, raw_data))
2020-09-03 19:25:02 +03:00
def open_url(self, url: str, hyperlink_id: int) -> None:
self.open_urls.append((url, hyperlink_id))
def clipboard_control(self, data: str, is_partial: bool = False) -> None:
self.cc_buf.append((data, is_partial))
def clear(self) -> None:
self.wtcbuf = b''
self.iconbuf = self.colorbuf = self.ctbuf = ''
self.titlebuf = []
2017-04-28 07:11:47 +03:00
self.iutf8 = True
self.notifications = []
2020-09-03 19:25:02 +03:00
self.open_urls = []
self.cc_buf = []
self.bell_count = 0
self.clone_cmds = []
self.current_clone_data = ''
def on_bell(self) -> None:
self.bell_count += 1
def on_activity_since_last_focus(self) -> None:
pass
def on_mouse_event(self, event):
ev = MouseEvent(**event)
opts = get_options()
action_def = opts.mousemap.get(ev)
if not action_def:
return False
self.current_mouse_button = ev.button
for action in opts.alias_map.resolve_aliases(action_def, 'mouse_map'):
getattr(self, action.func)(*action.args)
self.current_mouse_button = 0
return True
2022-02-22 09:17:44 +03:00
def handle_remote_print(self, msg):
text = process_remote_print(msg)
print(text, file=sys.__stderr__)
def handle_remote_clone(self, msg):
if not msg:
if self.current_clone_data:
cdata, self.current_clone_data = self.current_clone_data, ''
from kitty.launch import CloneCmd
self.clone_cmds.append(CloneCmd(cdata))
self.current_clone_data = ''
return
num, rest = msg.split(':', 1)
if num == '0' or len(self.current_clone_data) > 1024 * 1024:
self.current_clone_data = ''
self.current_clone_data += rest
def handle_remote_ssh(self, msg):
from kittens.ssh.main import get_ssh_data
2022-02-23 21:30:45 +03:00
if self.pty:
for line in get_ssh_data(msg, "testing"):
2022-02-23 21:30:45 +03:00
self.pty.write_to_child(line)
def handle_remote_echo(self, msg):
from base64 import standard_b64decode
2022-02-23 21:30:45 +03:00
if self.pty:
data = standard_b64decode(msg)
self.pty.write_to_child(data)
def filled_line_buf(ynum=5, xnum=5, cursor=Cursor()):
ans = LineBuf(ynum, xnum)
cursor.x = 0
for i in range(ynum):
t = (f'{i}') * xnum
ans.line(i).set_text(t, 0, xnum, cursor)
return ans
2016-10-16 18:06:27 +03:00
2016-11-04 12:26:31 +03:00
def filled_cursor():
ans = Cursor()
ans.bold = ans.italic = ans.reverse = ans.strikethrough = ans.dim = True
2016-11-04 12:26:31 +03:00
ans.fg = 0x101
ans.bg = 0x201
ans.decoration_fg = 0x301
return ans
2016-11-20 18:05:30 +03:00
def filled_history_buf(ynum=5, xnum=5, cursor=Cursor()):
lb = filled_line_buf(ynum, xnum, cursor)
ans = HistoryBuf(ynum, xnum)
for i in range(ynum):
ans.push(lb.line(i))
return ans
2016-10-16 18:06:27 +03:00
class BaseTest(TestCase):
ae = TestCase.assertEqual
2021-09-19 17:55:08 +03:00
maxDiff = 2048
2021-02-05 07:12:01 +03:00
is_ci = os.environ.get('CI') == 'true'
def set_options(self, options=None):
2020-02-13 06:03:57 +03:00
final_options = {'scrollback_pager_history_size': 1024, 'click_interval': 0.5}
if options:
final_options.update(options)
options = Options(merge_result_dicts(defaults._asdict(), final_options))
finalize_keys(options, {})
finalize_mouse_mappings(options, {})
set_options(options)
return options
2022-02-21 11:54:08 +03:00
def cmd_to_run_python_code(self, code):
from kitty.constants import kitty_exe
return [kitty_exe(), '+runpy', code]
2022-02-21 11:54:08 +03:00
def create_screen(self, cols=5, lines=5, scrollback=5, cell_width=10, cell_height=20, options=None):
self.set_options(options)
c = Callbacks()
s = Screen(c, lines, cols, scrollback, cell_width, cell_height, 0, c)
return s
2022-05-19 07:25:12 +03:00
def create_pty(self, argv, cols=80, lines=100, scrollback=100, cell_width=10, cell_height=20, options=None, cwd=None, env=None):
self.set_options(options)
return PTY(argv, lines, cols, scrollback, cell_width, cell_height, cwd, env)
2016-11-02 08:14:35 +03:00
def assertEqualAttributes(self, c1, c2):
x1, y1, c1.x, c1.y = c1.x, c1.y, 0, 0
x2, y2, c2.x, c2.y = c2.x, c2.y, 0, 0
try:
self.assertEqual(c1, c2)
finally:
c1.x, c1.y, c2.x, c2.y = x1, y1, x2, y2
class PTY:
2022-06-06 13:29:34 +03:00
def __init__(self, argv=None, rows=25, columns=80, scrollback=100, cell_width=10, cell_height=20, cwd=None, env=None):
2022-02-22 12:22:54 +03:00
if isinstance(argv, str):
argv = shlex.split(argv)
2022-02-24 07:35:13 +03:00
self.write_buf = b''
2022-06-06 13:29:34 +03:00
if argv is None:
from kitty.child import openpty
self.master_fd, self.slave_fd = openpty()
self.is_child = False
else:
pid, self.master_fd = fork()
self.is_child = pid == CHILD
if self.is_child:
while read_screen_size().width != columns * cell_width:
time.sleep(0.01)
if cwd:
os.chdir(cwd)
2022-02-22 12:22:54 +03:00
os.execvpe(argv[0], argv, env or os.environ)
os.set_blocking(self.master_fd, False)
self.cell_width = cell_width
self.cell_height = cell_height
self.set_window_size(rows=rows, columns=columns)
self.callbacks = Callbacks(self)
self.screen = Screen(self.callbacks, rows, columns, scrollback, cell_width, cell_height, 0, self.callbacks)
2022-02-22 09:17:44 +03:00
self.received_bytes = b''
def turn_off_echo(self):
s = termios.tcgetattr(self.master_fd)
s[3] &= ~termios.ECHO
termios.tcsetattr(self.master_fd, termios.TCSANOW, s)
def is_echo_on(self):
s = termios.tcgetattr(self.master_fd)
return True if s[3] & termios.ECHO else False
def __del__(self):
if not self.is_child:
2022-02-23 21:57:09 +03:00
fd = self.master_fd
del self.master_fd
2022-02-23 21:57:09 +03:00
os.close(fd)
def write_to_child(self, data):
2022-02-24 07:35:13 +03:00
if isinstance(data, str):
data = data.encode('utf-8')
self.write_buf += data
def send_cmd_to_child(self, cmd):
self.write_to_child(cmd + '\r')
def process_input_from_child(self, timeout=10):
2022-02-24 07:35:13 +03:00
rd, wd, err = select.select([self.master_fd], [self.master_fd] if self.write_buf else [], [self.master_fd], timeout)
if err:
raise OSError('master_fd is in error condition')
while wd:
try:
n = os.write(self.master_fd, self.write_buf)
except (BlockingIOError, OSError):
n = 0
if not n:
break
self.write_buf = self.write_buf[n:]
bytes_read = 0
2022-02-24 07:35:13 +03:00
while rd:
try:
data = os.read(self.master_fd, io.DEFAULT_BUFFER_SIZE)
except (BlockingIOError, OSError):
data = b''
if not data:
break
bytes_read += len(data)
2022-02-22 09:17:44 +03:00
self.received_bytes += data
parse_bytes(self.screen, data)
return bytes_read
def wait_till(self, q, timeout=10):
end_time = time.monotonic() + timeout
while not q() and time.monotonic() <= end_time:
self.process_input_from_child(timeout=max(0, end_time - time.monotonic()))
if not q():
2022-02-22 09:17:44 +03:00
raise TimeoutError(f'The condition was not met. Screen contents: \n {repr(self.screen_contents())}')
2022-06-06 13:29:34 +03:00
def set_window_size(self, rows=25, columns=80, send_signal=True):
if hasattr(self, 'screen'):
self.screen.resize(rows, columns)
2022-06-06 13:29:34 +03:00
if send_signal:
x_pixels = columns * self.cell_width
y_pixels = rows * self.cell_height
s = struct.pack('HHHH', rows, columns, x_pixels, y_pixels)
fcntl.ioctl(self.master_fd, termios.TIOCSWINSZ, s)
def screen_contents(self):
lines = []
for i in range(self.screen.lines):
x = str(self.screen.line(i))
if x:
lines.append(x)
return '\n'.join(lines)
def last_cmd_output(self, as_ansi=False, add_wrap_markers=False):
2022-02-23 15:44:40 +03:00
from kitty.window import cmd_output
return cmd_output(self.screen, as_ansi=as_ansi, add_wrap_markers=add_wrap_markers)