#!/usr/bin/env python # License: GPL v3 Copyright: 2016, Kovid Goyal import os import random import tempfile import time import unittest import zlib from base64 import standard_b64decode, standard_b64encode from contextlib import suppress from dataclasses import dataclass from io import BytesIO from itertools import cycle from kitty.fast_data_types import load_png_data, shm_unlink, shm_write, xor_data from . import BaseTest, parse_bytes try: from PIL import Image except ImportError: Image = None def send_command(screen, cmd, payload=b''): cmd = '\033_G' + cmd if payload: if isinstance(payload, str): payload = payload.encode('utf-8') payload = standard_b64encode(payload).decode('ascii') cmd += ';' + payload cmd += '\033\\' c = screen.callbacks c.clear() parse_bytes(screen, cmd.encode('ascii')) return c.wtcbuf def parse_response(res): if not res: return return res.decode('ascii').partition(';')[2].partition('\033')[0] def parse_response_with_ids(res): if not res: return a, b = res.decode('ascii').split(';', 1) code = b.partition('\033')[0].split(':', 1)[0] a = a.split('G', 1)[1] return code, a @dataclass(frozen=True) class Response: code: str = 'OK' msg: str = '' image_id: int = 0 image_number: int = 0 frame_number: int = 0 def parse_full_response(res): if not res: return a, b = res.decode('ascii').split(';', 1) code = b.partition('\033')[0].split(':', 1) if len(code) == 1: code = code[0] msg = '' else: code, msg = code a = a.split('G', 1)[1] ans = {'code': code, 'msg': msg} for x in a.split(','): k, _, v = x.partition('=') ans[{'i': 'image_id', 'I': 'image_number', 'r': 'frame_number'}[k]] = int(v) return Response(**ans) all_bytes = bytes(bytearray(range(256))) def byte_block(sz): d, m = divmod(sz, len(all_bytes)) return (all_bytes * d) + all_bytes[:m] def load_helpers(self): s = self.create_screen() g = s.grman def pl(payload, **kw): kw.setdefault('i', 1) cmd = ','.join(f'{k}={v}' for k, v in kw.items()) res = send_command(s, cmd, payload) return parse_response(res) def sl(payload, **kw): if isinstance(payload, str): payload = payload.encode('utf-8') data = kw.pop('expecting_data', payload) cid = kw.setdefault('i', 1) self.ae('OK', pl(payload, **kw)) img = g.image_for_client_id(cid) self.assertIsNotNone(img, f'No image with id {cid} found') self.ae(img['client_id'], cid) self.ae(img['data'], data) if 's' in kw: self.ae((kw['s'], kw['v']), (img['width'], img['height'])) self.ae(img['is_4byte_aligned'], kw.get('f') != 24) return img return s, g, pl, sl def put_helpers(self, cw, ch, cols=10, lines=5): iid = 0 def create_screen(): s = self.create_screen(cols, lines, cell_width=cw, cell_height=ch) return s, 2 / s.columns, 2 / s.lines def put_cmd( z=0, num_cols=0, num_lines=0, x_off=0, y_off=0, width=0, height=0, cell_x_off=0, cell_y_off=0, placement_id=0, cursor_movement=0, unicode_placeholder=0, parent_id=0, parent_placement_id=0, offset_from_parent_x=0, offset_from_parent_y=0, ): return ( f'z={z},c={num_cols},r={num_lines},x={x_off},y={y_off},w={width},h={height},' f'X={cell_x_off},Y={cell_y_off},p={placement_id},C={cursor_movement},' f'U={unicode_placeholder},P={parent_id},Q={parent_placement_id},' f'H={offset_from_parent_x},V={offset_from_parent_y}' ) def put_image(screen, w, h, **kw): nonlocal iid iid += 1 imgid = kw.pop('id', None) or iid no_id = kw.pop('no_id', False) if no_id: cmd = 'a=T,f=24,s=%d,v=%d,%s' % (w, h, put_cmd(**kw)) else: cmd = 'a=T,f=24,i=%d,s=%d,v=%d,%s' % (imgid, w, h, put_cmd(**kw)) data = b'x' * w * h * 3 res = send_command(screen, cmd, data) return imgid, parse_response(res) def put_ref(screen, **kw): imgid = kw.pop('id', None) or iid cmd = 'a=p,i=%d,%s' % (imgid, put_cmd(**kw)) return imgid, parse_response_with_ids(send_command(screen, cmd)) def layers(screen, scrolled_by=0, xstart=-1, ystart=1): return screen.grman.update_layers(scrolled_by, xstart, ystart, dx, dy, screen.columns, screen.lines, cw, ch) def rect_eq(r, left, top, right, bottom): for side in 'left top right bottom'.split(): a, b = r[side], locals()[side] if abs(a - b) > 0.0001: self.ae(a, b, 'the %s side is not equal' % side) s, dx, dy = create_screen() return s, dx, dy, put_image, put_ref, layers, rect_eq def make_send_command(screen): def li(payload='abcdefghijkl'*3, s=4, v=3, f=24, a='f', i=1, **kw): if s: kw['s'] = s if v: kw['v'] = v if f: kw['f'] = f if i: kw['i'] = i kw['a'] = a cmd = ','.join(f'{k}={v}' for k, v in kw.items()) res = send_command(screen, cmd, payload) return parse_full_response(res) return li class TestGraphics(BaseTest): def test_xor_data(self): def xor(skey, data): ckey = cycle(bytearray(skey)) return bytes(bytearray(k ^ d for k, d in zip(ckey, bytearray(data)))) base_data = os.urandom(64) key = os.urandom(len(base_data)) for base in (b'', base_data): for extra in range(len(base_data)): data = base + base_data[:extra] self.assertEqual(xor_data(key, data), xor(key, data)) def test_disk_cache(self): s = self.create_screen() dc = s.grman.disk_cache data = {} def key_as_bytes(key): if isinstance(key, int): key = str(key) if isinstance(key, str): key = key.encode('utf-8') return bytes(key) def add(key, val): bkey = key_as_bytes(key) data[key] = key_as_bytes(val) dc.add(bkey, data[key]) def remove(key): bkey = key_as_bytes(key) data.pop(key, None) return dc.remove(bkey) def check_data(): for key, val in data.items(): self.ae(dc.get(key_as_bytes(key)), val) for i in range(25): self.assertIsNone(add(i, f'{i}' * i)) self.assertEqual(dc.total_size, sum(map(len, data.values()))) self.assertTrue(dc.wait_for_write()) check_data() sz = dc.size_on_disk() self.assertEqual(sz, sum(map(len, data.values()))) for x in (2, 4, 6, 8): remove(x) check_data() self.assertRaises(KeyError, dc.get, key_as_bytes(x)) self.assertEqual(sz, dc.size_on_disk()) for x in ('xy', 'C'*4, 'B'*6, 'A'*8): add(x, x) self.assertTrue(dc.wait_for_write()) self.assertEqual(sz, dc.size_on_disk()) check_data() check_data() dc.clear() st = time.monotonic() while dc.size_on_disk() and time.monotonic() - st < 2: time.sleep(0.001) self.assertEqual(dc.size_on_disk(), 0) data.clear() for i in range(25): self.assertIsNone(add(i, f'{i}' * i)) dc.wait_for_write() check_data() before = dc.size_on_disk() while dc.total_size > before // 3: key = random.choice(tuple(data)) self.assertTrue(remove(key)) check_data() add('trigger defrag', 'XXX') dc.wait_for_write() self.assertLess(dc.size_on_disk(), before) check_data() dc.clear() st = time.monotonic() while dc.size_on_disk() and time.monotonic() - st < 20: time.sleep(0.01) self.assertEqual(dc.size_on_disk(), 0) for frame in range(32): add(f'1:{frame}', f'{frame:02d}' * 8) dc.wait_for_write() self.assertEqual(dc.size_on_disk(), 32 * 16) self.assertEqual(dc.num_cached_in_ram(), 0) num_in_ram = 0 for frame in range(32): dc.get(key_as_bytes(f'1:{frame}')) self.assertEqual(dc.num_cached_in_ram(), num_in_ram) for frame in range(32): dc.get(key_as_bytes(f'1:{frame}'), True) num_in_ram += 1 self.assertEqual(dc.num_cached_in_ram(), num_in_ram) def clear_predicate(key): return key.startswith(b'1:') dc.remove_from_ram(clear_predicate) self.assertEqual(dc.num_cached_in_ram(), 0) def test_suppressing_gr_command_responses(self): s, g, pl, sl = load_helpers(self) self.ae(pl('abcd', s=10, v=10, q=1), 'ENODATA:Insufficient image data: 4 < 400') self.ae(pl('abcd', s=10, v=10, q=2), None) self.assertIsNone(pl('abcd', s=1, v=1, a='q', q=1)) # Test chunked load self.assertIsNone(pl('abcd', s=2, v=2, m=1, q=1)) self.assertIsNone(pl('efgh', m=1)) self.assertIsNone(pl('ijkl', m=1)) self.assertIsNone(pl('mnop', m=0)) # errors self.assertIsNone(pl('abcd', s=2, v=2, m=1, q=1)) self.ae(pl('mnop', m=0), 'ENODATA:Insufficient image data: 8 < 16') self.assertIsNone(pl('abcd', s=2, v=2, m=1, q=2)) self.assertIsNone(pl('mnop', m=0)) # frames s = self.create_screen() li = make_send_command(s) self.assertEqual(li().code, 'ENOENT') self.assertIsNone(li(q=2)) self.assertIsNone(li(a='t', q=1)) self.assertIsNone(li(payload='2' * 12, z=77, m=1, q=1)) self.assertIsNone(li(payload='2' * 12, m=1)) self.assertIsNone(li(payload='2' * 12)) self.assertIsNone(li(payload='2' * 12, z=77, m=1, q=1)) self.ae(li(payload='2' * 12).code, 'ENODATA') self.assertIsNone(li(payload='2' * 12, z=77, m=1, q=2)) self.assertIsNone(li(payload='2' * 12)) def test_load_images(self): s, g, pl, sl = load_helpers(self) self.assertEqual(g.disk_cache.total_size, 0) # Test load query self.ae(pl('abcd', s=1, v=1, a='q'), 'OK') self.ae(g.image_count, 0) # Test simple load for f in 32, 24: p = 'abc' + ('d' if f == 32 else '') img = sl(p, s=1, v=1, f=f) self.ae(bool(img['is_4byte_aligned']), f == 32) # Test chunked load self.assertIsNone(pl('abcd', s=2, v=2, m=1)) self.assertIsNone(pl('efgh', m=1)) self.assertIsNone(pl('ijkl', m=1)) self.ae(pl('mnop', m=0), 'OK') img = g.image_for_client_id(1) self.ae(img['data'], b'abcdefghijklmnop') # Test compression random_data = byte_block(3 * 1024) compressed_random_data = zlib.compress(random_data) sl( compressed_random_data, s=24, v=32, o='z', expecting_data=random_data ) # Test chunked + compressed b = len(compressed_random_data) // 2 self.assertIsNone(pl(compressed_random_data[:b], s=24, v=32, o='z', m=1)) self.ae(pl(compressed_random_data[b:], m=0), 'OK') img = g.image_for_client_id(1) self.ae(img['data'], random_data) # Test loading from file def load_temp(prefix='tty-graphics-protocol-'): f = tempfile.NamedTemporaryFile(prefix=prefix) f.write(random_data), f.flush() sl(f.name, s=24, v=32, t='f', expecting_data=random_data) self.assertTrue(os.path.exists(f.name)) f.seek(0), f.truncate(), f.write(compressed_random_data), f.flush() sl(f.name, s=24, v=32, t='t', o='z', expecting_data=random_data) return f f = load_temp() self.assertFalse(os.path.exists(f.name), f'Temp file at {f.name} was not deleted') with suppress(FileNotFoundError): f.close() f = load_temp('') self.assertTrue(os.path.exists(f.name), f'Temp file at {f.name} was deleted') f.close() # Test loading from POSIX SHM name = '/kitty-test-shm' shm_write(name, random_data) sl(name, s=24, v=32, t='s', expecting_data=random_data) self.assertRaises( FileNotFoundError, shm_unlink, name ) # check that file was deleted s.reset() self.assertEqual(g.disk_cache.total_size, 0) @unittest.skipIf(Image is None, 'PIL not available, skipping PNG tests') def test_load_png(self): s, g, pl, sl = load_helpers(self) w, h = 5, 3 rgba_data = byte_block(w * h * 4) img = Image.frombytes('RGBA', (w, h), rgba_data) rgb_data = img.convert('RGB').convert('RGBA').tobytes() self.assertEqual(g.disk_cache.total_size, 0) def png(mode='RGBA'): buf = BytesIO() i = img if mode != i.mode: i = img.convert(mode) i.save(buf, 'PNG') return buf.getvalue() for mode in 'RGBA RGB'.split(): data = png(mode) sl(data, f=100, expecting_data=rgb_data if mode == 'RGB' else rgba_data) for m in 'LP': img = img.convert(m) rgba_data = img.convert('RGBA').tobytes() data = png(m) sl(data, f=100, expecting_data=rgba_data) self.ae(pl(b'a' * 20, f=100, S=20).partition(':')[0], 'EBADPNG') s.reset() self.assertEqual(g.disk_cache.total_size, 0) def test_load_png_simple(self): # 1x1 transparent PNG png_data = standard_b64decode('iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mNk+P+/HgAFhAJ/wlseKgAAAABJRU5ErkJggg==') expected = b'\x00\xff\xff\x7f' self.ae(load_png_data(png_data), (expected, 1, 1)) s, g, pl, sl = load_helpers(self) sl(png_data, f=100, expecting_data=expected) # test error handling for loading bad png data self.assertRaisesRegex(ValueError, '[EBADPNG]', load_png_data, b'dsfsdfsfsfd') def test_gr_operations_with_numbers(self): s = self.create_screen() g = s.grman self.assertEqual(g.disk_cache.total_size, 0) def li(payload, **kw): cmd = ','.join(f'{k}={v}' for k, v in kw.items()) res = send_command(s, cmd, payload) return parse_response_with_ids(res) code, ids = li('abc', s=1, v=1, f=24, I=1, i=3) self.ae(code, 'EINVAL') code, ids = li('abc', s=1, v=1, f=24, I=1) self.ae((code, ids), ('OK', 'i=1,I=1')) img = g.image_for_client_number(1) self.ae(img['client_number'], 1) self.ae(img['client_id'], 1) code, ids = li('abc', s=1, v=1, f=24, I=1) self.ae((code, ids), ('OK', 'i=2,I=1')) img = g.image_for_client_number(1) self.ae(img['client_number'], 1) self.ae(img['client_id'], 2) code, ids = li('abc', s=1, v=1, f=24, I=1) self.ae((code, ids), ('OK', 'i=3,I=1')) code, ids = li('abc', s=1, v=1, f=24, i=5) self.ae((code, ids), ('OK', 'i=5')) code, ids = li('abc', s=1, v=1, f=24, I=3) self.ae((code, ids), ('OK', 'i=4,I=3')) # Test chunked load with number self.assertIsNone(li('abcd', s=2, v=2, m=1, I=93)) self.assertIsNone(li('efgh', m=1)) self.assertIsNone(li('ijkx', m=1)) self.ae(li('mnop', m=0), ('OK', 'i=6,I=93')) img = g.image_for_client_number(93) self.ae(img['data'], b'abcdefghijkxmnop') self.ae(img['client_id'], 6) # test put with number def put(**kw): cmd = ','.join(f'{k}={v}' for k, v in kw.items()) cmd = 'a=p,' + cmd return parse_response_with_ids(send_command(s, cmd)) code, idstr = put(c=2, r=2, I=93) self.ae((code, idstr), ('OK', 'i=6,I=93')) code, idstr = put(c=2, r=2, I=94) self.ae(code, 'ENOENT') # test delete with number def delete(ac='N', **kw): cmd = 'a=d' if ac: cmd += f',d={ac}' if kw: cmd += ',' + ','.join(f'{k}={v}' for k, v in kw.items()) send_command(s, cmd) count = s.grman.image_count put(i=1), put(i=2), put(i=3), put(i=4), put(i=5) delete(I=94) self.ae(s.grman.image_count, count) delete(I=93) self.ae(s.grman.image_count, count - 1) delete(I=1) self.ae(s.grman.image_count, count - 2) s.reset() self.assertEqual(g.disk_cache.total_size, 0) def test_image_put(self): cw, ch = 10, 20 s, dx, dy, put_image, put_ref, layers, rect_eq = put_helpers(self, cw, ch) self.ae(put_image(s, 10, 20)[1], 'OK') l0 = layers(s) self.ae(len(l0), 1) rect_eq(l0[0]['src_rect'], 0, 0, 1, 1) rect_eq(l0[0]['dest_rect'], -1, 1, -1 + dx, 1 - dy) self.ae(l0[0]['group_count'], 1) self.ae(s.cursor.x, 1), self.ae(s.cursor.y, 0) iid, (code, idstr) = put_ref(s, num_cols=s.columns, x_off=2, y_off=1, width=3, height=5, cell_x_off=3, cell_y_off=1, z=-1, placement_id=17) self.ae(idstr, f'i={iid},p=17') l2 = layers(s) self.ae(len(l2), 2) rect_eq(l2[0]['src_rect'], 2 / 10, 1 / 20, (2 + 3) / 10, (1 + 5)/20) left, top = -1 + dx + 3 * dx / cw, 1 - 1 * dy / ch rect_eq(l2[0]['dest_rect'], left, top, -1 + (1 + s.columns) * dx, top - dy * 5 / ch) rect_eq(l2[1]['src_rect'], 0, 0, 1, 1) rect_eq(l2[1]['dest_rect'], -1, 1, -1 + dx, 1 - dy) self.ae(l2[0]['group_count'], 2) self.ae(l2[1]['group_count'], 1) self.ae(s.cursor.x, 0), self.ae(s.cursor.y, 1) self.ae(put_image(s, 10, 20, cursor_movement=1)[1], 'OK') self.ae(s.cursor.x, 0), self.ae(s.cursor.y, 1) s.reset() self.assertEqual(s.grman.disk_cache.total_size, 0) def test_image_layer_grouping(self): cw, ch = 10, 20 s, dx, dy, put_image, put_ref, layers, rect_eq = put_helpers(self, cw, ch) def group_counts(): return tuple(x['group_count'] for x in layers(s)) self.ae(put_image(s, 10, 20, id=1)[1], 'OK') self.ae(group_counts(), (1,)) put_ref(s, id=1, num_cols=2, num_lines=1, placement_id=2) put_ref(s, id=1, num_cols=2, num_lines=1, placement_id=3, z=-2) put_ref(s, id=1, num_cols=2, num_lines=1, placement_id=4, z=-2) self.ae(group_counts(), (4, 3, 2, 1)) self.ae(put_image(s, 8, 16, id=2, z=-1)[1], 'OK') self.ae(group_counts(), (2, 1, 1, 2, 1)) def test_image_parents(self): cw, ch = 10, 20 iw, ih = 10, 20 s, dx, dy, put_image, put_ref, layers, rect_eq = put_helpers(self, cw, ch) def positions(): ans = {} def x(x): return round(((x + 1)/2) * s.columns) def y(y): return int(((-y + 1)/2) * s.lines) for i in layers(s): d = i['dest_rect'] ans[(i['image_id'], i['ref_id'])] = {'x': x(d['left']), 'y': y(d['top'])} return ans def p(x, y=0): return {'x':x, 'y': y} self.ae(put_image(s, iw, ih, id=1)[1], 'OK') self.ae(put_ref(s, id=1, placement_id=1), (1, ('OK', 'i=1,p=1'))) pos = {(1, 1): p(0), (1, 2): p(1)} self.ae(positions(), pos) # check that adding a reference to a non-existent parent fails self.ae(put_ref(s, id=1, placement_id=33, parent_id=1, parent_placement_id=2), (1, ('ENOPARENT', 'i=1,p=33'))) self.ae(put_ref(s, id=1, placement_id=33, parent_id=33), (1, ('ENOPARENT', 'i=1,p=33'))) # check that we cannot add a reference that is its own parent self.ae(put_ref(s, id=1, placement_id=1, parent_id=1, parent_placement_id=1), (1, ('EINVAL', 'i=1,p=1'))) self.ae(put_image(s, iw, ih, id=2)[1], 'OK') pos[(2,1)] = p(2) self.ae(positions(), pos) # Add two children to the first placement of img2 before = s.cursor.x, s.cursor.y self.ae(put_ref(s, id=1, placement_id=2, parent_id=2, offset_from_parent_y=3), (1, ('OK', 'i=1,p=2'))) self.ae(before, (s.cursor.x, s.cursor.y), 'Cursor must not move for child image') pos[(1,3)] = p(2, 3) self.ae(positions(), pos) self.ae(put_ref(s, id=2, placement_id=3, parent_id=2, offset_from_parent_y=4), (2, ('OK', 'i=2,p=3'))) pos[(2,2)] = p(2, 4) self.ae(positions(), pos) # Add a grand child to the second child of img2 self.ae(put_ref(s, id=2, placement_id=4, parent_id=2, parent_placement_id=3, offset_from_parent_x=-1), (2, ('OK', 'i=2,p=4'))) pos[(2,3)] = p(pos[(2,2)]['x']-1, pos[(2,2)]['y']) self.ae(positions(), pos) # Check that creating a cycle is prevented self.ae(put_ref(s, id=2, placement_id=3, parent_id=2, parent_placement_id=4), (2, ('ECYCLE', 'i=2,p=3'))) self.ae(positions(), pos) # Check that depth is limited for i in range(5, 12): q = put_ref(s, id=2, placement_id=i, parent_id=2, parent_placement_id=i-1, offset_from_parent_x=-1)[1][0] if q == 'ETOODEEP': break self.ae(q, 'OK') else: self.assertTrue(False, 'Failed to limit reference chain depth') # Check that deleting a parent removes all descendants send_command(s, 'a=d,d=i,i=2,p=3') pos.pop((2,3)), pos.pop((2,2)) self.ae(positions(), pos) # Check that deleting a parent deletes all descendants and also removes # images with no remaining placements self.ae(put_ref(s, id=2, placement_id=3, parent_id=2, offset_from_parent_y=4), (2, ('OK', 'i=2,p=3'))) pos[(2,11)] = p(2, 4) self.ae(positions(), pos) self.ae(put_image(s, iw, ih, id=3, placement_id=97, parent_id=2, parent_placement_id=3)[1], 'OK') pos[(3,1)] = p(2, 4) self.ae(positions(), pos) send_command(s, 'a=d,d=i,i=2') pos.pop((3,1)), pos.pop((2,11)), pos.pop((2,1)), pos.pop((1,3)) self.ae(positions(), pos) # Check that virtual placements that try to be relative are rejected self.ae(put_ref(s, id=1, placement_id=11, parent_id=1, unicode_placeholder=1), (1, ('EINVAL', 'i=1,p=11'))) # Check creation of children of a unicode placeholder based image s, dx, dy, put_image, put_ref, layers, rect_eq = put_helpers(self, cw, ch) put_image(s, 20, 20, num_cols=4, num_lines=2, unicode_placeholder=1, id=42) s.update_only_line_graphics_data() self.assertFalse(positions()) # the reference is virtual self.ae(put_ref(s, id=42, placement_id=11, parent_id=42, offset_from_parent_y=2, offset_from_parent_x=1), (42, ('OK', 'i=42,p=11'))) self.assertFalse(positions()) # the reference is virtual without any cell images so the child is invisible s.apply_sgr("38;5;42") # These two characters will become one 2x1 ref. s.cursor.x = s.cursor.y = 1 s.draw("\U0010EEEE\u0305\u0305\U0010EEEE\u0305\u030D") s.cursor.x = s.cursor.y = 0 s.draw("\U0010EEEE\u0305\u0305\U0010EEEE\u0305\u030D") s.update_only_line_graphics_data() pos = {(1, 2): p(1, 2), (1, 3): p(0), (1, 4): p(1)} self.ae(positions(), pos) s.cursor.x = s.cursor.y = 0 s.erase_in_display(0, False) s.update_only_line_graphics_data() self.assertFalse(positions()) # the reference is virtual without any cell images so the child is invisible s.cursor.x = s.cursor.y = 2 s.draw("\U0010EEEE\u0305\u0305\U0010EEEE\u0305\u030D") s.update_only_line_graphics_data() self.ae(positions(), {(1, 5): {'x': 2, 'y': 2}, (1, 2): {'x': 3, 'y': 4}}) def test_unicode_placeholders(self): # This test tests basic image placement using using unicode placeholders cw, ch = 10, 20 s, dx, dy, put_image, put_ref, layers, rect_eq = put_helpers(self, cw, ch) # Upload two images. put_image(s, 20, 20, num_cols=4, num_lines=2, unicode_placeholder=1, id=42) put_image(s, 10, 20, num_cols=4, num_lines=2, unicode_placeholder=1, id=(42<<16) + (43<<8) + 44) # The references are virtual, so no visible refs yet. s.update_only_line_graphics_data() refs = layers(s) self.ae(len(refs), 0) # A reminder of row/column diacritics meaning (assuming 0-based): # \u0305 -> 0 # \u030D -> 1 # \u030E -> 2 # \u0310 -> 3 # Now print the placeholders for the first image. # Encode the id as an 8-bit color. s.apply_sgr("38;5;42") # These two characters will become one 2x1 ref. s.draw("\U0010EEEE\u0305\u0305\U0010EEEE\u0305\u030D") # These two characters will be two separate refs (not contiguous). s.draw("\U0010EEEE\u0305\u0305\U0010EEEE\u0305\u030E") s.cursor_back(4) s.update_only_line_graphics_data() refs = layers(s) self.ae(len(refs), 3) self.ae(refs[0]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 0.5, 'bottom': 0.5}) self.ae(refs[1]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 0.25, 'bottom': 0.5}) self.ae(refs[2]['src_rect'], {'left': 0.5, 'top': 0.0, 'right': 0.75, 'bottom': 0.5}) # Erase the line. s.erase_in_line(2) # There must be 0 refs after the line is erased. s.update_only_line_graphics_data() refs = layers(s) self.ae(len(refs), 0) # Now test encoding IDs with the 24-bit color. # The first image, 1x1 s.apply_sgr("38;2;0;0;42") s.draw("\U0010EEEE\u0305\u0305") # The second image, 2x1 s.apply_sgr("38;2;42;43;44") s.draw("\U0010EEEE\u0305\u030D\U0010EEEE\u0305\u030E") s.cursor_back(2) s.update_only_line_graphics_data() refs = layers(s) self.ae(len(refs), 2) self.ae(refs[0]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 0.25, 'bottom': 0.5}) # The second ref spans the whole widths of the second image because it's # fit to height and centered in a 4x2 box (specified in put_image). self.ae(refs[1]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 0.5}) # Erase the line. s.erase_in_line(2) # Now test implicit column numbers. # We will mix implicit and explicit column/row specifications, but they # will be combine into just two references. s.apply_sgr("38;5;42") # full row 0 of the first image s.draw("\U0010EEEE\u0305\u0305\U0010EEEE\u0305\U0010EEEE\U0010EEEE\u0305") # full row 1 of the first image s.draw("\U0010EEEE\u030D\U0010EEEE\U0010EEEE\U0010EEEE\u030D\u0310") s.cursor_back(8) s.update_only_line_graphics_data() refs = layers(s) self.ae(len(refs), 2) self.ae(refs[0]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 0.5}) self.ae(refs[1]['src_rect'], {'left': 0.0, 'top': 0.5, 'right': 1.0, 'bottom': 1.0}) # Now reset the screen, the images should be erased. s.reset() refs = layers(s) self.ae(len(refs), 0) def test_unicode_placeholders_3rd_combining_char(self): # This test tests that we can use the 3rd diacritic for the most # significant byte cw, ch = 10, 20 s, dx, dy, put_image, put_ref, layers, rect_eq = put_helpers(self, cw, ch) # Upload two images. put_image(s, 20, 20, num_cols=4, num_lines=2, unicode_placeholder=1, id=42) put_image(s, 20, 10, num_cols=4, num_lines=1, unicode_placeholder=1, id=(42 << 24) + 43) # This one will have id=43, which does not exist. s.apply_sgr("38;2;0;0;43") s.draw("\U0010EEEE\u0305\U0010EEEE\U0010EEEE\U0010EEEE") s.cursor_back(4) s.update_only_line_graphics_data() refs = layers(s) self.ae(len(refs), 0) s.erase_in_line(2) # This one will have id=42. We explicitly specify that the most # significant byte is 0 (third \u305). Specifying the zero byte like # this is not necessary but is correct. s.apply_sgr("38;2;0;0;42") s.draw("\U0010EEEE\u0305\u0305\u0305\U0010EEEE\u0305\u030D\u0305") # This is the second image. # \u059C -> 42 s.apply_sgr("38;2;0;0;43") s.draw("\U0010EEEE\u0305\u0305\u059C\U0010EEEE\u0305\u030D\u059C") # Check that we can continue by using implicit row/column specification. s.draw("\U0010EEEE\u0305\U0010EEEE") s.cursor_back(6) s.update_only_line_graphics_data() refs = layers(s) self.ae(len(refs), 2) self.ae(refs[0]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 0.5, 'bottom': 0.5}) self.ae(refs[1]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 1.0}) s.erase_in_line(2) # Now test the 8-bit color mode. Using the third diacritic, we can # specify 16 bits: the most significant byte and the least significant # byte. s.apply_sgr("38;5;42") s.draw("\U0010EEEE\u0305\u0305\u0305\U0010EEEE") s.apply_sgr("38;5;43") s.draw("\U0010EEEE\u0305\u0305\u059C\U0010EEEE\U0010EEEE\u0305\U0010EEEE") s.cursor_back(6) s.update_only_line_graphics_data() refs = layers(s) self.ae(len(refs), 2) self.ae(refs[0]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 0.5, 'bottom': 0.5}) self.ae(refs[1]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 1.0}) def test_unicode_placeholders_multiple_placements(self): # Here we test placement specification via underline color. cw, ch = 10, 20 s, dx, dy, put_image, put_ref, layers, rect_eq = put_helpers(self, cw, ch) put_image(s, 20, 20, num_cols=1, num_lines=1, placement_id=1, unicode_placeholder=1, id=42) put_ref(s, id=42, num_cols=2, num_lines=1, placement_id=22, unicode_placeholder=1) put_ref(s, id=42, num_cols=4, num_lines=2, placement_id=44, unicode_placeholder=1) # The references are virtual, so no visible refs yet. s.update_only_line_graphics_data() refs = layers(s) self.ae(len(refs), 0) # Draw the first row of each placement. s.apply_sgr("38;5;42") s.apply_sgr("58;5;1") s.draw("\U0010EEEE\u0305") s.apply_sgr("58;5;22") s.draw("\U0010EEEE\u0305\U0010EEEE\u0305") s.apply_sgr("58;5;44") s.draw("\U0010EEEE\u0305\U0010EEEE\u0305\U0010EEEE\u0305\U0010EEEE\u0305") s.update_only_line_graphics_data() refs = layers(s) self.ae(len(refs), 3) self.ae(refs[0]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 1.5}) self.ae(refs[1]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 1.0}) self.ae(refs[2]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 0.5}) def test_unicode_placeholders_scroll(self): # Here we test scrolling of a region. We'll draw an image spanning 8 # rows and then scroll only the middle part of this image. Each # reference corresponds to one row. cw, ch = 5, 10 s, dx, dy, put_image, put_ref, layers, rect_eq = put_helpers(self, cw, ch, lines=8) put_image(s, 5, 80, num_cols=1, num_lines=8, unicode_placeholder=1, id=42) s.apply_sgr("38;5;42") s.cursor_position(1, 0) s.draw("\U0010EEEE\u0305\n") s.cursor_position(2, 0) s.draw("\U0010EEEE\u030D\n") s.cursor_position(3, 0) s.draw("\U0010EEEE\u030E\n") s.cursor_position(4, 0) s.draw("\U0010EEEE\u0310\n") s.cursor_position(5, 0) s.draw("\U0010EEEE\u0312\n") s.cursor_position(6, 0) s.draw("\U0010EEEE\u033D\n") s.cursor_position(7, 0) s.draw("\U0010EEEE\u033E\n") s.cursor_position(8, 0) s.draw("\U0010EEEE\u033F") # Each line will contain a part of the image. s.update_only_line_graphics_data() refs = layers(s) refs = sorted(refs, key=lambda r: r['src_rect']['top']) self.ae(len(refs), 8) for i in range(8): self.ae(refs[i]['src_rect'], {'left': 0.0, 'top': 0.125*i, 'right': 1.0, 'bottom': 0.125*(i + 1)}) self.ae(refs[i]['dest_rect']['top'], 1 - 0.25*i) # Now set margins to lines 3 and 6. s.set_margins(3, 6) # 1-based indexing # Scroll two lines down (i.e. move lines 3..6 up). # Lines 3 and 4 will be erased. s.cursor_position(6, 0) s.index() s.index() s.update_only_line_graphics_data() refs = layers(s) refs = sorted(refs, key=lambda r: r['src_rect']['top']) self.ae(len(refs), 6) # Lines 1 and 2 are outside of the region, not scrolled. self.ae(refs[0]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 0.125}) self.ae(refs[0]['dest_rect']['top'], 1.0) self.ae(refs[1]['src_rect'], {'left': 0.0, 'top': 0.125*1, 'right': 1.0, 'bottom': 0.125*2}) self.ae(refs[1]['dest_rect']['top'], 1.0 - 0.25*1) # Lines 3 and 4 are erased. # Lines 5 and 6 are now higher. self.ae(refs[2]['src_rect'], {'left': 0.0, 'top': 0.125*4, 'right': 1.0, 'bottom': 0.125*5}) self.ae(refs[2]['dest_rect']['top'], 1.0 - 0.25*2) self.ae(refs[3]['src_rect'], {'left': 0.0, 'top': 0.125*5, 'right': 1.0, 'bottom': 0.125*6}) self.ae(refs[3]['dest_rect']['top'], 1.0 - 0.25*3) # Lines 7 and 8 are outside of the region. self.ae(refs[4]['src_rect'], {'left': 0.0, 'top': 0.125*6, 'right': 1.0, 'bottom': 0.125*7}) self.ae(refs[4]['dest_rect']['top'], 1.0 - 0.25*6) self.ae(refs[5]['src_rect'], {'left': 0.0, 'top': 0.125*7, 'right': 1.0, 'bottom': 0.125*8}) self.ae(refs[5]['dest_rect']['top'], 1.0 - 0.25*7) # Now scroll three lines up (i.e. move lines 5..6 down). # Line 6 will be erased. s.cursor_position(3, 0) s.reverse_index() s.reverse_index() s.reverse_index() s.update_only_line_graphics_data() refs = layers(s) refs = sorted(refs, key=lambda r: r['src_rect']['top']) self.ae(len(refs), 5) # Lines 1 and 2 are outside of the region, not scrolled. self.ae(refs[0]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 0.125}) self.ae(refs[0]['dest_rect']['top'], 1.0) self.ae(refs[1]['src_rect'], {'left': 0.0, 'top': 0.125*1, 'right': 1.0, 'bottom': 0.125*2}) self.ae(refs[1]['dest_rect']['top'], 1.0 - 0.25*1) # Lines 3, 4 and 6 are erased. # Line 5 is now lower. self.ae(refs[2]['src_rect'], {'left': 0.0, 'top': 0.125*4, 'right': 1.0, 'bottom': 0.125*5}) self.ae(refs[2]['dest_rect']['top'], 1.0 - 0.25*5) # Lines 7 and 8 are outside of the region. self.ae(refs[3]['src_rect'], {'left': 0.0, 'top': 0.125*6, 'right': 1.0, 'bottom': 0.125*7}) self.ae(refs[3]['dest_rect']['top'], 1.0 - 0.25*6) self.ae(refs[4]['src_rect'], {'left': 0.0, 'top': 0.125*7, 'right': 1.0, 'bottom': 0.125*8}) self.ae(refs[4]['dest_rect']['top'], 1.0 - 0.25*7) def test_gr_scroll(self): cw, ch = 10, 20 s, dx, dy, put_image, put_ref, layers, rect_eq = put_helpers(self, cw, ch) put_image(s, 10, 20, no_id=True) # a one cell image at (0, 0) self.ae(len(layers(s)), 1) for i in range(s.lines): s.index() self.ae(len(layers(s)), 0), self.ae(s.grman.image_count, 1) for i in range(s.historybuf.ynum - 1): s.index() self.ae(len(layers(s)), 0), self.ae(s.grman.image_count, 1) s.index() self.ae(s.grman.image_count, 0) # Now test with margins s.reset() # Test images outside page area untouched put_image(s, cw, ch) # a one cell image at (0, 0) for i in range(s.lines - 1): s.index() put_image(s, cw, ch) # a one cell image at (0, bottom) s.set_margins(2, 4) # 1-based indexing self.ae(s.grman.image_count, 2) for i in range(s.lines + s.historybuf.ynum): s.index() self.ae(s.grman.image_count, 2) for i in range(s.lines): # ensure cursor is at top margin s.reverse_index() # Test clipped scrolling during index put_image(s, cw, 2*ch, z=-1, no_id=True) # 1x2 cell image self.ae(s.grman.image_count, 3) self.ae(layers(s)[0]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 1.0}) s.index(), s.index() l0 = layers(s) self.ae(len(l0), 3) self.ae(layers(s)[0]['src_rect'], {'left': 0.0, 'top': 0.5, 'right': 1.0, 'bottom': 1.0}) s.index() self.ae(s.grman.image_count, 2) # Test clipped scrolling during reverse_index for i in range(s.lines): s.reverse_index() put_image(s, cw, 2*ch, z=-1, no_id=True) # 1x2 cell image self.ae(s.grman.image_count, 3) self.ae(layers(s)[0]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 1.0}) while s.cursor.y != 1: s.reverse_index() s.reverse_index(), s.reverse_index() self.ae(layers(s)[0]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 0.5}) s.reverse_index() self.ae(s.grman.image_count, 2) s.reset() self.assertEqual(s.grman.disk_cache.total_size, 0) def test_gr_reset(self): cw, ch = 10, 20 s, dx, dy, put_image, put_ref, layers, rect_eq = put_helpers(self, cw, ch) put_image(s, cw, ch) # a one cell image at (0, 0) self.ae(len(layers(s)), 1) s.reset() self.ae(s.grman.image_count, 0) put_image(s, cw, ch) # a one cell image at (0, 0) self.ae(s.grman.image_count, 1) for i in range(s.lines): s.index() s.reset() self.ae(s.grman.image_count, 1) def test_gr_delete(self): cw, ch = 10, 20 s, dx, dy, put_image, put_ref, layers, rect_eq = put_helpers(self, cw, ch) def delete(ac=None, **kw): cmd = 'a=d' if ac: cmd += f',d={ac}' if kw: cmd += ',' + ','.join(f'{k}={v}' for k, v in kw.items()) send_command(s, cmd) put_image(s, cw, ch) delete() self.ae(len(layers(s)), 0), self.ae(s.grman.image_count, 1) delete('A') self.ae(s.grman.image_count, 0) self.assertEqual(s.grman.disk_cache.total_size, 0) iid = put_image(s, cw, ch)[0] delete('I', i=iid, p=7) self.ae(s.grman.image_count, 1) delete('I', i=iid) self.ae(s.grman.image_count, 0) self.assertEqual(s.grman.disk_cache.total_size, 0) iid = put_image(s, cw, ch, placement_id=9)[0] delete('I', i=iid, p=9) self.ae(s.grman.image_count, 0) self.assertEqual(s.grman.disk_cache.total_size, 0) s.reset() put_image(s, cw, ch) put_image(s, cw, ch) delete('C') self.ae(s.grman.image_count, 2) s.cursor_position(1, 1) delete('C') self.ae(s.grman.image_count, 1) delete('P', x=2, y=1) self.ae(s.grman.image_count, 0) self.assertEqual(s.grman.disk_cache.total_size, 0) put_image(s, cw, ch, z=9) delete('Z', z=9) self.ae(s.grman.image_count, 0) self.assertEqual(s.grman.disk_cache.total_size, 0) # test put + delete + put iid = 999999 self.ae(put_image(s, cw, ch, id=iid), (iid, 'OK')) self.ae(put_ref(s, id=iid), (iid, ('OK', f'i={iid}'))) delete('i', i=iid) self.ae(s.grman.image_count, 1) self.ae(put_ref(s, id=iid), (iid, ('OK', f'i={iid}'))) delete('I', i=iid) self.ae(put_ref(s, id=iid), (iid, ('ENOENT', f'i={iid}'))) self.ae(s.grman.image_count, 0) self.assertEqual(s.grman.disk_cache.total_size, 0) def test_animation_frame_loading(self): s = self.create_screen() g = s.grman li = make_send_command(s) def t(code='OK', image_id=1, frame_number=2, **kw): res = li(**kw) if code is not None: self.assertEqual(code, res.code, f'{code} != {res.code}: {res.msg}') if image_id is not None: self.assertEqual(image_id, res.image_id) if frame_number is not None: self.assertEqual(frame_number, res.frame_number) # test error on send frame for non-existent image self.assertEqual(li().code, 'ENOENT') # create image self.assertEqual(li(a='t').code, 'OK') self.assertEqual(g.disk_cache.total_size, 36) # simple new frame (width=4, height=3) self.assertIsNone(li(payload='2' * 12, z=77, m=1)) self.assertIsNone(li(payload='2' * 12, z=77, m=1)) t(payload='2' * 12, z=77) img = g.image_for_client_id(1) self.assertEqual(img['extra_frames'], ({'gap': 77, 'id': 2, 'data': b'2' * 36},)) # test editing a frame t(payload='3' * 36, r=2) img = g.image_for_client_id(1) self.assertEqual(img['extra_frames'], ({'gap': 77, 'id': 2, 'data': b'3' * 36},)) # test editing part of a frame t(payload='4' * 12, r=2, s=2, v=2) img = g.image_for_client_id(1) def expand(*rows): ans = [] for r in rows: ans.append(''.join(x * 3 for x in str(r))) return ''.join(ans).encode('ascii') self.assertEqual(img['extra_frames'], ({'gap': 77, 'id': 2, 'data': expand(4433, 4433, 3333)},)) t(payload='5' * 12, r=2, s=2, v=2, x=1, y=1) img = g.image_for_client_id(1) self.assertEqual(img['extra_frames'], ({'gap': 77, 'id': 2, 'data': expand(4433, 4553, 3553)},)) t(payload='3' * 36, r=2) img = g.image_for_client_id(1) self.assertEqual(img['extra_frames'], ({'gap': 77, 'id': 2, 'data': b'3' * 36},)) # test loading from previous frame t(payload='4' * 12, c=2, s=2, v=2, z=101, frame_number=3) img = g.image_for_client_id(1) self.assertEqual(img['extra_frames'], ( {'gap': 77, 'id': 2, 'data': b'3' * 36}, {'gap': 101, 'id': 3, 'data': b'444444333333444444333333333333333333'}, )) # test changing gaps img = g.image_for_client_id(1) self.assertEqual(img['root_frame_gap'], 0) self.assertIsNone(li(a='a', i=1, r=1, z=13)) img = g.image_for_client_id(1) self.assertEqual(img['root_frame_gap'], 13) self.assertIsNone(li(a='a', i=1, r=2, z=43)) img = g.image_for_client_id(1) self.assertEqual(img['extra_frames'][0]['gap'], 43) # test changing current frame img = g.image_for_client_id(1) self.assertEqual(img['current_frame_index'], 0) self.assertIsNone(li(a='a', i=1, c=2)) img = g.image_for_client_id(1) self.assertEqual(img['current_frame_index'], 1) # test delete of frames t(payload='5' * 36, frame_number=4) img = g.image_for_client_id(1) self.assertEqual(img['extra_frames'], ( {'gap': 43, 'id': 2, 'data': b'3' * 36}, {'gap': 101, 'id': 3, 'data': b'444444333333444444333333333333333333'}, {'gap': 40, 'id': 4, 'data': b'5' * 36}, )) self.assertEqual(img['current_frame_index'], 1) self.assertIsNone(li(a='d', d='f', i=1, r=1)) img = g.image_for_client_id(1) self.assertEqual(img['current_frame_index'], 0) self.assertEqual(img['data'], b'3' * 36) self.assertEqual(img['extra_frames'], ( {'gap': 101, 'id': 3, 'data': b'444444333333444444333333333333333333'}, {'gap': 40, 'id': 4, 'data': b'5' * 36}, )) self.assertIsNone(li(a='a', i=1, c=3)) img = g.image_for_client_id(1) self.assertEqual(img['current_frame_index'], 2) self.assertIsNone(li(a='d', d='f', i=1, r=2)) img = g.image_for_client_id(1) self.assertEqual(img['current_frame_index'], 1) self.assertEqual(img['data'], b'3' * 36) self.assertEqual(img['extra_frames'], ( {'gap': 40, 'id': 4, 'data': b'5' * 36}, )) self.assertIsNone(li(a='d', d='f', i=1)) img = g.image_for_client_id(1) self.assertEqual(img['current_frame_index'], 0) self.assertEqual(img['data'], b'5' * 36) self.assertFalse(img['extra_frames']) self.assertIsNone(li(a='d', d='f', i=1)) img = g.image_for_client_id(1) self.assertEqual(img['data'], b'5' * 36) self.assertIsNone(li(a='d', d='F', i=1)) self.ae(g.image_count, 0) self.assertEqual(g.disk_cache.total_size, 0) # test frame composition self.assertEqual(li(a='t').code, 'OK') self.assertEqual(g.disk_cache.total_size, 36) t(payload='2' * 36) t(payload='3' * 36, frame_number=3) img = g.image_for_client_id(1) self.assertEqual(img['extra_frames'], ( {'gap': 40, 'id': 2, 'data': b'2' * 36}, {'gap': 40, 'id': 3, 'data': b'3' * 36}, )) self.assertEqual(li(a='c', i=11).code, 'ENOENT') self.assertEqual(li(a='c', i=1, r=1, c=2).code, 'OK') img = g.image_for_client_id(1) self.assertEqual(img['extra_frames'], ( {'gap': 40, 'id': 2, 'data': b'abcdefghijkl'*3}, {'gap': 40, 'id': 3, 'data': b'3' * 36}, )) self.assertEqual(li(a='c', i=1, r=2, c=3, w=1, h=2, x=1, y=1).code, 'OK') img = g.image_for_client_id(1) self.assertEqual(img['extra_frames'], ( {'gap': 40, 'id': 2, 'data': b'abcdefghijkl'*3}, {'gap': 40, 'id': 3, 'data': b'3' * 12 + (b'333abc' + b'3' * 6) * 2}, )) def test_graphics_quota_enforcement(self): s = self.create_screen() g = s.grman g.storage_limit = 36*2 li = make_send_command(s) # test quota for simple images self.assertEqual(li(a='T').code, 'OK') self.assertEqual(li(a='T', i=2).code, 'OK') self.assertEqual(g.disk_cache.total_size, g.storage_limit) self.assertEqual(g.image_count, 2) self.assertEqual(li(a='T', i=3).code, 'OK') self.assertEqual(g.disk_cache.total_size, g.storage_limit) self.assertEqual(g.image_count, 2) # test quota for frames for i in range(8): self.assertEqual(li(payload=f'{i}' * 36, i=2).code, 'OK') self.assertEqual(li(payload='x' * 36, i=2).code, 'ENOSPC') # test editing should not trigger quota self.assertEqual(li(payload='4' * 12, r=2, s=2, v=2, i=2).code, 'OK') s.reset() self.ae(g.image_count, 0) self.assertEqual(g.disk_cache.total_size, 0)