#!/usr/bin/env python # License: GPLv3 Copyright: 2022, Kovid Goyal import os import shlex import shutil import subprocess import tempfile import unittest from contextlib import contextmanager from functools import lru_cache, partial from kitty.constants import kitty_base_dir, terminfo_dir from kitty.fast_data_types import CURSOR_BEAM, CURSOR_BLOCK, CURSOR_UNDERLINE from kitty.shell_integration import ( setup_bash_env, setup_fish_env, setup_zsh_env ) from . import BaseTest @lru_cache() def bash_ok(): v = shutil.which('bash') if not v: return False o = subprocess.check_output([v, '-c', 'echo "${BASH_VERSION}"']).decode('utf-8').strip() if not o or int(o[0]) < 5: return False return True def basic_shell_env(home_dir): ans = { 'PATH': os.environ.get('PATH', '/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin'), 'HOME': home_dir, 'TERM': 'xterm-kitty', 'TERMINFO': terminfo_dir, 'KITTY_SHELL_INTEGRATION': 'enabled', 'KITTY_INSTALLATION_DIR': kitty_base_dir, 'BASH_SILENCE_DEPRECATION_WARNING': '1', } for x in ('USER', 'LANG'): if os.environ.get(x): ans[x] = os.environ[x] return ans def safe_env_for_running_shell(argv, home_dir, rc='', shell='zsh'): ans = basic_shell_env(home_dir) if shell == 'zsh': argv.insert(1, '--noglobalrcs') with open(os.path.join(home_dir, '.zshrc'), 'w') as f: print(rc + '\nZLE_RPROMPT_INDENT=0', file=f) setup_zsh_env(ans, argv) elif shell == 'fish': conf_dir = os.path.join(home_dir, '.config', 'fish') os.makedirs(conf_dir, exist_ok=True) # Avoid generating unneeded completion scripts os.makedirs(os.path.join(home_dir, '.local', 'share', 'fish', 'generated_completions'), exist_ok=True) with open(os.path.join(conf_dir, 'config.fish'), 'w') as f: print(rc + '\n', file=f) setup_fish_env(ans, argv) elif shell == 'bash': setup_bash_env(ans, argv) ans['KITTY_BASH_INJECT'] += ' posix' ans['KITTY_BASH_POSIX_ENV'] = os.path.join(home_dir, '.bashrc') with open(ans['KITTY_BASH_POSIX_ENV'], 'w') as f: # ensure LINES and COLUMNS are kept up to date print('shopt -s checkwinsize', file=f) if rc: print(rc, file=f) return ans class ShellIntegration(BaseTest): @contextmanager def run_shell(self, shell='zsh', rc='', cmd='', setup_env=None): home_dir = os.path.realpath(tempfile.mkdtemp()) cmd = cmd or shell cmd = shlex.split(cmd.format(**locals())) env = (setup_env or safe_env_for_running_shell)(cmd, home_dir, rc=rc, shell=shell) try: pty = self.create_pty(cmd, cwd=home_dir, env=env) i = 10 while i > 0 and not pty.screen_contents().strip(): pty.process_input_from_child() i -= 1 yield pty finally: if os.path.exists(home_dir): shutil.rmtree(home_dir) @unittest.skipUnless(shutil.which('zsh'), 'zsh not installed') def test_zsh_integration(self): ps1, rps1 = 'left>', ' 1) pty.write_to_child('\x04') pty.wait_till(lambda: pty.screen.cursor.shape == CURSOR_BEAM) pty.send_cmd_to_child('_set_key vi') pty.wait_till(lambda: pty.screen_contents().count(right_prompt) == 3) pty.wait_till(lambda: pty.screen.cursor.shape == CURSOR_BEAM) pty.write_to_child('\x1b') pty.wait_till(lambda: pty.screen.cursor.shape == CURSOR_BLOCK) pty.write_to_child('r') pty.wait_till(lambda: pty.screen.cursor.shape == CURSOR_UNDERLINE) pty.write_to_child('\x1b') pty.wait_till(lambda: pty.screen.cursor.shape == CURSOR_BLOCK) pty.write_to_child('i') pty.wait_till(lambda: pty.screen.cursor.shape == CURSOR_BEAM) pty.send_cmd_to_child('_set_key default') # pipestatus pty.send_cmd_to_child('clear;false|true|false') pty.send_cmd_to_child('echo $pipestatus $status') pty.wait_till(lambda: pty.screen_contents().count(right_prompt) == 2) self.ae(pty.last_cmd_output(), '1 0 1 1') pty.send_cmd_to_child('_set_status_prompt') pty.send_cmd_to_child('false|true|false') pty.wait_till(lambda: pty.screen_contents().count(right_prompt) == 4) self.assertTrue(str(pty.screen.line(pty.screen.cursor.y)).startswith(f'1 0 1 1 {fish_prompt}')) pty.send_cmd_to_child('exit') @unittest.skipUnless(bash_ok(), 'bash not installed or too old') def test_bash_integration(self): ps1 = 'prompt> ' with self.run_shell( shell='bash', rc=f''' PS1="{ps1}" ''') as pty: try: pty.wait_till(lambda: pty.screen.cursor.shape == CURSOR_BEAM) except TimeoutError as e: raise AssertionError(f'Cursor was not changed to beam. Screen contents: {repr(pty.screen_contents())}') from e pty.wait_till(lambda: pty.screen_contents().count(ps1) == 1) self.ae(pty.screen_contents(), ps1) pty.wait_till(lambda: pty.callbacks.titlebuf[-1:] == ['~']) self.ae(pty.callbacks.titlebuf[-1], '~') pty.callbacks.clear() pty.send_cmd_to_child('mkdir test && ls -a') pty.wait_till(lambda: pty.callbacks.titlebuf[-2:] == ['mkdir test && ls -a', '~']) pty.wait_till(lambda: pty.screen_contents().count(ps1) == 2) q = '\n'.join(str(pty.screen.line(i)) for i in range(1, pty.screen.cursor.y)) self.ae(pty.last_cmd_output(), q) # shrink the screen pty.write_to_child(r'echo $COLUMNS') pty.set_window_size(rows=20, columns=40) pty.process_input_from_child() def redrawn(): q = pty.screen_contents() return '$COLUMNS' in q and q.count(ps1) == 2 pty.wait_till(redrawn) self.ae(ps1 + 'echo $COLUMNS', str(pty.screen.line(pty.screen.cursor.y))) pty.write_to_child('\r') pty.wait_till(lambda: pty.screen_contents().count(ps1) == 3) self.ae('40', str(pty.screen.line(pty.screen.cursor.y - 1))) self.ae(ps1 + 'echo $COLUMNS', str(pty.screen.line(pty.screen.cursor.y - 2))) pty.send_cmd_to_child('clear') pty.wait_till(lambda: pty.screen_contents() == ps1) pty.wait_till(lambda: pty.screen.cursor.shape == CURSOR_BEAM) pty.send_cmd_to_child('cat') pty.wait_till(lambda: pty.screen.cursor.shape == 0) pty.write_to_child('\x04') pty.wait_till(lambda: pty.screen.cursor.shape == CURSOR_BEAM) pty.write_to_child('\x04') pty.send_cmd_to_child('clear') pty.wait_till(lambda: pty.callbacks.titlebuf) with self.run_shell(shell='bash', rc=f'''PS1="{ps1}"''') as pty: pty.callbacks.clear() pty.send_cmd_to_child('printf "%s\x16\a%s" "a" "b"') pty.wait_till(lambda: pty.screen_contents().count(ps1) == 2) self.ae(pty.screen_contents(), f'{ps1}printf "%s^G%s" "a" "b"\nab{ps1}') pty.send_cmd_to_child('echo $HISTFILE') pty.wait_till(lambda: '.bash_history' in pty.screen_contents()) for ps1 in ('line1\\nline\\2\\prompt> ', 'line1\nprompt> ', 'line1\\nprompt> ',): with self.subTest(ps1=ps1), self.run_shell( shell='bash', rc=f''' PS1="{ps1}" ''') as pty: ps1 = ps1.replace('\\n', '\n') pty.wait_till(lambda: pty.screen_contents().count(ps1) == 1) pty.send_cmd_to_child('echo test') pty.wait_till(lambda: pty.screen_contents().count(ps1) == 2) self.ae(pty.screen_contents(), f'{ps1}echo test\ntest\n{ps1}') pty.write_to_child(r'echo $COLUMNS') pty.set_window_size(rows=20, columns=40) pty.process_input_from_child() pty.wait_till(redrawn) self.ae(ps1.splitlines()[-1] + 'echo $COLUMNS', str(pty.screen.line(pty.screen.cursor.y))) pty.write_to_child('\r') pty.wait_till(lambda: pty.screen_contents().count(ps1) == 3) self.ae('40', str(pty.screen.line(pty.screen.cursor.y - len(ps1.splitlines())))) self.ae(ps1.splitlines()[-1] + 'echo $COLUMNS', str(pty.screen.line(pty.screen.cursor.y - 1 - len(ps1.splitlines())))) # test startup file sourcing def setup_env(excluded, argv, home_dir, rc='', shell='bash'): ans = basic_shell_env(home_dir) setup_bash_env(ans, argv) for x in {'profile', 'bash.bashrc', '.bash_profile', '.bash_login', '.profile', '.bashrc', 'rcfile'} - excluded: with open(os.path.join(home_dir, x), 'w') as f: if x == '.bashrc' and rc: print(rc, file=f) else: print(f'echo [{x}]', file=f) ans['KITTY_BASH_ETC_LOCATION'] = home_dir ans['PS1'] = 'PROMPT $ ' return ans def run_test(argv, *expected, excluded=(), rc='', wait_string='PROMPT $', assert_not_in=False): with self.subTest(argv=argv), self.run_shell(shell='bash', setup_env=partial(setup_env, set(excluded)), cmd=argv, rc=rc) as pty: pty.wait_till(lambda: wait_string in pty.screen_contents()) q = pty.screen_contents() for x in expected: if assert_not_in: self.assertNotIn(f'[{x}]', q) else: self.assertIn(f'[{x}]', q) run_test('bash', 'bash.bashrc', '.bashrc') run_test('bash --rcfile rcfile', 'bash.bashrc', 'rcfile') run_test('bash --init-file rcfile', 'bash.bashrc', 'rcfile') run_test('bash --norc') run_test('bash -l', 'profile', '.bash_profile') run_test('bash --noprofile -l') run_test('bash -l', 'profile', '.bash_login', excluded=('.bash_profile',)) run_test('bash -l', 'profile', '.profile', excluded=('.bash_profile', '.bash_login')) # test argument parsing and non-interactive shell run_test('bash -s arg1 --rcfile rcfile', 'rcfile', rc='echo ok;read', wait_string='ok', assert_not_in=True) run_test('bash +O login_shell -ic "echo ok;read"', 'bash.bashrc', excluded=('.bash_profile'), wait_string='ok', assert_not_in=True) run_test('bash -l .bashrc', 'profile', rc='echo ok;read', wait_string='ok', assert_not_in=True) run_test('bash -il -- .bashrc', 'profile', rc='echo ok;read', wait_string='ok')