#!/usr/bin/env python3 # vim:fileencoding=utf-8 # License: GPL v3 Copyright: 2016, Kovid Goyal import time from base64 import standard_b64encode from binascii import hexlify from functools import partial from kitty.fast_data_types import CURSOR_BLOCK, parse_bytes, parse_bytes_dump from kitty.notify import ( NotificationCommand, handle_notification_cmd, notification_activated, reset_registry ) from . import BaseTest class CmdDump(list): def __call__(self, *a): self.append(a) class TestParser(BaseTest): def parse_bytes_dump(self, s, x, *cmds): cd = CmdDump() if isinstance(x, str): x = x.encode('utf-8') cmds = tuple(('draw', x) if isinstance(x, str) else x for x in cmds) parse_bytes_dump(cd, s, x) current = '' q = [] for args in cd: if args[0] == 'draw': if args[1] is not None: current += args[1] else: if current: q.append(('draw', current)) current = '' q.append(args) if current: q.append(('draw', current)) self.ae(tuple(q), cmds) def test_simple_parsing(self): s = self.create_screen() pb = partial(self.parse_bytes_dump, s) pb('12', '12') self.ae(str(s.line(0)), '12') self.ae(s.cursor.x, 2) pb('3456', '3456') self.ae(str(s.line(0)), '12345') self.ae(str(s.line(1)), '6') pb(b'\n123\n\r45', ('screen_linefeed',), '123', ('screen_linefeed',), ('screen_carriage_return',), '45') self.ae(str(s.line(1)), '6') self.ae(str(s.line(2)), ' 123') self.ae(str(s.line(3)), '45') parse_bytes(s, b'\rabcde') self.ae(str(s.line(3)), 'abcde') pb('\rßxyz1', ('screen_carriage_return',), 'ßxyz1') self.ae(str(s.line(3)), 'ßxyz1') pb('ニチ ', 'ニチ ') self.ae(str(s.line(4)), 'ニチ ') def test_esc_codes(self): s = self.create_screen() pb = partial(self.parse_bytes_dump, s) pb('12\033Da', '12', ('screen_index',), 'a') self.ae(str(s.line(0)), '12') self.ae(str(s.line(1)), ' a') pb('\033x', ('Unknown char after ESC: 0x%x' % ord('x'),)) pb('\033c123', ('screen_reset', ), '123') self.ae(str(s.line(0)), '123') def test_charsets(self): s = self.create_screen() pb = partial(self.parse_bytes_dump, s) pb(b'\xc3') pb(b'\xa1', ('draw', b'\xc3\xa1'.decode('utf-8'))) s = self.create_screen() pb = partial(self.parse_bytes_dump, s) pb('\033)0\x0e/_', ('screen_designate_charset', 1, ord('0')), ('screen_change_charset', 1), '/_') self.ae(str(s.line(0)), '/\xa0') self.assertTrue(s.callbacks.iutf8) pb('\033%@_', ('screen_use_latin1', 1), '_') self.assertFalse(s.callbacks.iutf8) s = self.create_screen() pb = partial(self.parse_bytes_dump, s) pb('\033(0/_', ('screen_designate_charset', 0, ord('0')), '/_') self.ae(str(s.line(0)), '/\xa0') def test_csi_codes(self): s = self.create_screen() pb = partial(self.parse_bytes_dump, s) pb('abcde', 'abcde') s.cursor_back(5) pb('x\033[2@y', 'x', ('screen_insert_characters', 2), 'y') self.ae(str(s.line(0)), 'xy bc') pb('x\033[2;7@y', 'x', ('CSI code 0x40 has 2 > 1 parameters',), 'y') pb('x\033[2;-7@y', 'x', ('CSI code 0x40 has 2 > 1 parameters',), 'y') pb('x\033[-2@y', 'x', ('CSI code 0x40 is not allowed to have negative parameter (-2)',), 'y') pb('x\033[2-3@y', 'x', ('CSI code can contain hyphens only at the start of numbers',), 'y') pb('x\033[@y', 'x', ('screen_insert_characters', 1), 'y') pb('x\033[345@y', 'x', ('screen_insert_characters', 345), 'y') pb('x\033[345;@y', 'x', ('screen_insert_characters', 345), 'y') pb('\033[H', ('screen_cursor_position', 1, 1)) self.ae(s.cursor.x, 0), self.ae(s.cursor.y, 0) pb('\033[4H', ('screen_cursor_position', 4, 1)) pb('\033[4;0H', ('screen_cursor_position', 4, 0)) pb('\033[3;2H', ('screen_cursor_position', 3, 2)) pb('\033[3;2;H', ('screen_cursor_position', 3, 2)) pb('\033[00000000003;0000000000000002H', ('screen_cursor_position', 3, 2)) self.ae(s.cursor.x, 1), self.ae(s.cursor.y, 2) pb('\033[J', ('screen_erase_in_display', 0, 0)) pb('\033[?J', ('screen_erase_in_display', 0, 1)) pb('\033[?2J', ('screen_erase_in_display', 2, 1)) pb('\033[h') pb('\033[20;4h', ('screen_set_mode', 20, 0), ('screen_set_mode', 4, 0)) pb('\033[?1000;1004h', ('screen_set_mode', 1000, 1), ('screen_set_mode', 1004, 1)) pb('\033[20;4;20l', ('screen_reset_mode', 20, 0), ('screen_reset_mode', 4, 0), ('screen_reset_mode', 20, 0)) pb('\033[=c', ('report_device_attributes', 0, 61)) s.reset() def sgr(params): return (('select_graphic_rendition', '{} '.format(x)) for x in params.split()) pb('\033[1;2;3;4;7;9;34;44m', *sgr('1 2 3 4 7 9 34 44')) for attr in 'bold italic reverse strikethrough dim'.split(): self.assertTrue(getattr(s.cursor, attr)) self.ae(s.cursor.decoration, 1) self.ae(s.cursor.fg, 4 << 8 | 1) self.ae(s.cursor.bg, 4 << 8 | 1) pb('\033[38;5;1;48;5;7m', ('select_graphic_rendition', '38 5 1 '), ('select_graphic_rendition', '48 5 7 ')) self.ae(s.cursor.fg, 1 << 8 | 1) self.ae(s.cursor.bg, 7 << 8 | 1) pb('\033[38;2;1;2;3;48;2;7;8;9m', ('select_graphic_rendition', '38 2 1 2 3 '), ('select_graphic_rendition', '48 2 7 8 9 ')) self.ae(s.cursor.fg, 1 << 24 | 2 << 16 | 3 << 8 | 2) self.ae(s.cursor.bg, 7 << 24 | 8 << 16 | 9 << 8 | 2) pb('\033[0;2m', *sgr('0 2')) pb('\033[;2m', *sgr('0 2')) pb('\033[m', *sgr('0 ')) pb('\033[1;;2m', *sgr('1 0 2')) pb('\033[38;5;1m', ('select_graphic_rendition', '38 5 1 ')) pb('\033[58;2;1;2;3m', ('select_graphic_rendition', '58 2 1 2 3 ')) pb('\033[38;2;1;2;3m', ('select_graphic_rendition', '38 2 1 2 3 ')) pb('\033[1001:2:1:2:3m', ('select_graphic_rendition', '1001 2 1 2 3 ')) pb('\033[38:2:1:2:3;48:5:9;58;5;7m', ( 'select_graphic_rendition', '38 2 1 2 3 '), ('select_graphic_rendition', '48 5 9 '), ('select_graphic_rendition', '58 5 7 ')) c = s.callbacks pb('\033[5n', ('report_device_status', 5, 0)) self.ae(c.wtcbuf, b'\033[0n') c.clear() pb('\033[6n', ('report_device_status', 6, 0)) self.ae(c.wtcbuf, b'\033[1;1R') pb('12345', '12345') c.clear() pb('\033[6n', ('report_device_status', 6, 0)) self.ae(c.wtcbuf, b'\033[2;1R') c.clear() s.cursor_key_mode = True pb('\033[?1$p', ('report_mode_status', 1, 1)) self.ae(c.wtcbuf, b'\033[?1;1$y') pb('\033[?1l', ('screen_reset_mode', 1, 1)) self.assertFalse(s.cursor_key_mode) c.clear() pb('\033[?1$p', ('report_mode_status', 1, 1)) self.ae(c.wtcbuf, b'\033[?1;2$y') pb('\033[2;4r', ('screen_set_margins', 2, 4)) c.clear() pb('\033[14t', ('screen_report_size', 14)) self.ae(c.wtcbuf, b'\033[4;100;50t') self.ae(s.margin_top, 1), self.ae(s.margin_bottom, 3) pb('\033[r', ('screen_set_margins', 0, 0)) self.ae(s.margin_top, 0), self.ae(s.margin_bottom, 4) pb('\033[1 q', ('screen_set_cursor', 1, ord(' '))) self.assertTrue(s.cursor.blink) self.ae(s.cursor.shape, CURSOR_BLOCK) s.reset() pb('\033[3 @', ('Shift left escape code not implemented',)) pb('\033[3 A', ('Shift right escape code not implemented',)) pb('\033[3;4 S', ('Select presentation directions escape code not implemented',)) def test_csi_code_rep(self): s = self.create_screen(8) pb = partial(self.parse_bytes_dump, s) pb('\033[1b', ('screen_repeat_character', 1)) self.ae(str(s.line(0)), '') pb('x\033[7b', 'x', ('screen_repeat_character', 7)) self.ae(str(s.line(0)), 'xxxxxxxx') pb('\033[1;3H', ('screen_cursor_position', 1, 3)) pb('\033[byz\033[b', ('screen_repeat_character', 1), 'yz', ('screen_repeat_character', 1)) # repeat 'x' at 3, then 'yz' at 4-5, then repeat 'z' at 6 self.ae(str(s.line(0)), 'xxxyzzxx') s.reset() pb(' \033[3b', ' ', ('screen_repeat_character', 3)) self.ae(str(s.line(0)), ' ') s.reset() pb('\t\033[b', ('screen_tab',), ('screen_repeat_character', 1)) self.ae(str(s.line(0)), '\t') s.reset() b']]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]' def test_osc_codes(self): s = self.create_screen() pb = partial(self.parse_bytes_dump, s) c = s.callbacks pb('a\033]2;x\\ryz\x9cbcde', 'a', ('set_title', 'x\\ryz'), 'bcde') self.ae(str(s.line(0)), 'abcde') self.ae(c.titlebuf, 'x\\ryz') c.clear() pb('\033]\x07', ('set_title', ''), ('set_icon', '')) self.ae(c.titlebuf, ''), self.ae(c.iconbuf, '') pb('\033]ab\x07', ('set_title', 'ab'), ('set_icon', 'ab')) self.ae(c.titlebuf, 'ab'), self.ae(c.iconbuf, 'ab') c.clear() pb('\033]2;;;;\x07', ('set_title', ';;;')) self.ae(c.titlebuf, ';;;') c.clear() pb('\033]2;\x07', ('set_title', '')) self.ae(c.titlebuf, '') pb('\033]110\x07', ('set_dynamic_color', 110, '')) self.ae(c.colorbuf, '') c.clear() pb('\033]9;\x07', ('desktop_notify', 9, '')) pb('\033]9;test it\x07', ('desktop_notify', 9, 'test it')) pb('\033]99;moo=foo;test it\x07', ('desktop_notify', 99, 'moo=foo;test it')) self.ae(c.notifications, [(9, ''), (9, 'test it'), (99, 'moo=foo;test it')]) c.clear() pb('\033]8;;\x07', ('set_active_hyperlink', None, None)) pb('\033]8moo\x07', ('Ignoring malformed OSC 8 code',)) pb('\033]8;moo\x07', ('Ignoring malformed OSC 8 code',)) pb('\033]8;id=xyz;\x07', ('set_active_hyperlink', 'xyz', None)) pb('\033]8;moo:x=z:id=xyz:id=abc;http://yay;.com\x07', ('set_active_hyperlink', 'xyz', 'http://yay;.com')) def test_desktop_notify(self): reset_registry() notifications = [] activations = [] prev_cmd = NotificationCommand() def reset(): nonlocal prev_cmd reset_registry() del notifications[:] del activations[:] prev_cmd = NotificationCommand() def notify(title, body, identifier): notifications.append((title, body, identifier)) def h(raw_data, osc_code=99, window_id=1): nonlocal prev_cmd x = handle_notification_cmd(osc_code, raw_data, window_id, prev_cmd, notify) if x is not None and osc_code == 99: prev_cmd = x def activated(identifier, window_id, focus, report): activations.append((identifier, window_id, focus, report)) h('test it', osc_code=9) self.ae(notifications, [('test it', '', 'i0')]) notification_activated(notifications[-1][-1], activated) self.ae(activations, [('0', 1, True, False)]) reset() h('d=0:i=x;title') h('d=1:i=x:p=body;body') self.ae(notifications, [('title', 'body', 'i0')]) notification_activated(notifications[-1][-1], activated) self.ae(activations, [('x', 1, True, False)]) reset() h('i=x:p=body:a=-focus;body') self.ae(notifications, [('body', '', 'i0')]) notification_activated(notifications[-1][-1], activated) self.ae(activations, []) reset() h('i=x:e=1;' + standard_b64encode(b'title').decode('ascii')) self.ae(notifications, [('title', '', 'i0')]) notification_activated(notifications[-1][-1], activated) self.ae(activations, [('x', 1, True, False)]) reset() h('d=0:i=x:a=-report;title') h('d=1:i=x:a=report;body') self.ae(notifications, [('titlebody', '', 'i0')]) notification_activated(notifications[-1][-1], activated) self.ae(activations, [('x', 1, True, True)]) reset() h(';title') self.ae(notifications, [('title', '', 'i0')]) notification_activated(notifications[-1][-1], activated) self.ae(activations, [('0', 1, True, False)]) reset() def test_dcs_codes(self): s = self.create_screen() c = s.callbacks pb = partial(self.parse_bytes_dump, s) q = hexlify(b'kind').decode('ascii') pb('a\033P+q{}\x9cbcde'.format(q), 'a', ('screen_request_capabilities', 43, q), 'bcde') self.ae(str(s.line(0)), 'abcde') self.ae(c.wtcbuf, '1+r{}={}'.format(q, '1b5b313b3242').encode('ascii')) c.clear() pb('\033P$q q\033\\', ('screen_request_capabilities', ord('$'), ' q')) self.ae(c.wtcbuf, b'\033P1$r1 q\033\\') c.clear() pb('\033P$qm\033\\', ('screen_request_capabilities', ord('$'), 'm')) self.ae(c.wtcbuf, b'\033P1$rm\033\\') for sgr in '0;34;102;1;2;3;4 0;38:5:200;58:2:10:11:12'.split(): expected = set(sgr.split(';')) - {'0'} c.clear() parse_bytes(s, '\033[{}m\033P$qm\033\\'.format(sgr).encode('ascii')) r = c.wtcbuf.decode('ascii').partition('r')[2].partition('m')[0] self.ae(expected, set(r.split(';'))) c.clear() pb('\033P$qr\033\\', ('screen_request_capabilities', ord('$'), 'r')) self.ae(c.wtcbuf, '\033P1$r{};{}r\033\\'.format(s.margin_top + 1, s.margin_bottom + 1).encode('ascii')) def test_sc81t(self): s = self.create_screen() pb = partial(self.parse_bytes_dump, s) pb('\033 G', ('screen_set_8bit_controls', 1)) c = s.callbacks pb('\033P$qm\033\\', ('screen_request_capabilities', ord('$'), 'm')) self.ae(c.wtcbuf, b'\x901$rm\x9c') c.clear() pb('\033[0c', ('report_device_attributes', 0, 0)) self.ae(c.wtcbuf, b'\x9b?62;c') def test_pending(self): s = self.create_screen() timeout = 0.1 s.set_pending_timeout(timeout) pb = partial(self.parse_bytes_dump, s) pb('\033P=1s\033\\', ('screen_start_pending_mode',)) pb('a') self.ae(str(s.line(0)), '') pb('\033P=2s\033\\', ('screen_stop_pending_mode',), ('draw', 'a')) self.ae(str(s.line(0)), 'a') pb('\033P=1s\033\\', ('screen_start_pending_mode',)) pb('b') self.ae(str(s.line(0)), 'a') time.sleep(timeout) pb('c', ('draw', 'bc')) self.ae(str(s.line(0)), 'abc') pb('\033P=1s\033\\d', ('screen_start_pending_mode',)) pb('\033P=2s\033\\', ('screen_stop_pending_mode',), ('draw', 'd')) pb('\033P=1s\033\\e', ('screen_start_pending_mode',)) pb('\033P'), pb('='), pb('2s') pb('\033\\', ('screen_stop_pending_mode',), ('draw', 'e')) pb('\033P=1sxyz;.;\033\\''\033P=2skjf".,> 2 else (4 << 8) | 1)