Handle remote print callback in tests

This commit is contained in:
Kovid Goyal 2022-02-22 11:47:44 +05:30
parent 05617f7dca
commit cb32a0b8fc
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
2 changed files with 17 additions and 6 deletions

View File

@ -301,6 +301,13 @@ def text_sanitizer(as_ansi: bool, add_wrap_markers: bool) -> Callable[[str], str
return remove_wrap_markers
def process_remote_print(msg: bytes) -> str:
from base64 import standard_b64decode
from .cli import green
text = standard_b64decode(msg).decode('utf-8', 'replace')
return text.replace('\x1b', green(r'\e')).replace('\a', green(r'\a')).replace('\0', green(r'\0'))
class EdgeWidths:
left: Optional[float]
top: Optional[float]
@ -845,11 +852,7 @@ class Window:
get_boss().handle_remote_cmd(cmd, self)
def handle_remote_print(self, msg: bytes) -> None:
from base64 import standard_b64decode
from .cli import green
text = standard_b64decode(msg).decode('utf-8')
text = text.replace('\x1b', green(r'\e')).replace('\a', green(r'\a')).replace('\0', green(r'\0'))
text = process_remote_print(msg)
print(text, end='', file=sys.stderr)
sys.stderr.flush()

View File

@ -7,6 +7,7 @@ import os
import select
import shlex
import struct
import sys
import termios
import time
from pty import CHILD, fork
@ -20,6 +21,7 @@ from kitty.options.parse import merge_result_dicts
from kitty.options.types import Options, defaults
from kitty.types import MouseEvent
from kitty.utils import read_screen_size, write_all
from kitty.window import process_remote_print
class Callbacks:
@ -87,6 +89,10 @@ class Callbacks:
self.current_mouse_button = 0
return True
def handle_remote_print(self, msg):
text = process_remote_print(msg)
print(text, file=sys.__stderr__)
def filled_line_buf(ynum=5, xnum=5, cursor=Cursor()):
ans = LineBuf(ynum, xnum)
@ -178,6 +184,7 @@ class PTY:
termios.tcsetattr(self.master_fd, termios.TCSADRAIN, new)
self.callbacks = Callbacks()
self.screen = Screen(self.callbacks, rows, columns, scrollback, cell_width, cell_height, 0, self.callbacks)
self.received_bytes = b''
def __del__(self):
if not self.is_child:
@ -205,6 +212,7 @@ class PTY:
if not data:
break
bytes_read += len(data)
self.received_bytes += data
parse_bytes(self.screen, data)
return bytes_read
@ -213,7 +221,7 @@ class PTY:
while not q() and time.monotonic() - st < timeout:
self.process_input_from_child(timeout=timeout - (time.monotonic() - st))
if not q():
raise TimeoutError('The condition was not met')
raise TimeoutError(f'The condition was not met. Screen contents: \n {repr(self.screen_contents())}')
def set_window_size(self, rows=25, columns=80):
if hasattr(self, 'screen'):