From 54c5faa12d9d476f23e4ea73971285bd045f6148 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Tue, 15 Mar 2022 11:25:21 +0530 Subject: [PATCH] Simplify hostname matching Now that we load the opts upfront, we can have load_config return the final opts object itself --- kittens/ssh/config.py | 64 ++++++++++++++++--------------------------- kittens/ssh/main.py | 5 ++-- kitty_tests/ssh.py | 45 +++++++++++++----------------- 3 files changed, 44 insertions(+), 70 deletions(-) diff --git a/kittens/ssh/config.py b/kittens/ssh/config.py index 6ec437dd2..854e89aac 100644 --- a/kittens/ssh/config.py +++ b/kittens/ssh/config.py @@ -11,44 +11,23 @@ from kitty.conf.utils import ( ) from kitty.constants import config_dir -from .options.types import Options as SSHOptions, defaults, option_names +from .options.types import Options as SSHOptions, defaults SYSTEM_CONF = '/etc/xdg/kitty/ssh.conf' defconf = os.path.join(config_dir, 'ssh.conf') -def host_matches(pat: str, hostname: str, username: str) -> bool: - upat = '*' - if '@' in pat: - upat, pat = pat.split('@', 1) - return fnmatch.fnmatchcase(hostname, pat) and fnmatch.fnmatchcase(username, upat) +def host_matches(mpat: str, hostname: str, username: str) -> bool: + for pat in mpat.split(): + upat = '*' + if '@' in pat: + upat, pat = pat.split('@', 1) + if fnmatch.fnmatchcase(hostname, pat) and fnmatch.fnmatchcase(username, upat): + return True + return False -def options_for_host(hostname: str, username: str, per_host_opts: Dict[str, SSHOptions], cli_hostname: str = '', cli_uname: str = '') -> SSHOptions: - matches = [] - for spat, opts in per_host_opts.items(): - for pat in spat.split(): - if host_matches(pat, hostname, username) or (cli_hostname and host_matches(pat, cli_hostname, cli_uname)): - matches.append(opts) - if not matches: - return SSHOptions({}) - base = matches[0] - rest = matches[1:] - if rest: - ans = SSHOptions(base._asdict()) - for name in option_names: - for opts in rest: - val = getattr(opts, name) - if isinstance(val, dict): - getattr(ans, name).update(val) - else: - setattr(ans, name, val) - else: - ans = base - return ans - - -def load_config(*paths: str, overrides: Optional[Iterable[str]] = None) -> Dict[str, SSHOptions]: +def load_config(*paths: str, overrides: Optional[Iterable[str]] = None, hostname: str = '!', username: str = '') -> SSHOptions: from .options.parse import ( create_result_dict, merge_result_dicts, parse_conf_item ) @@ -75,18 +54,21 @@ def load_config(*paths: str, overrides: Optional[Iterable[str]] = None) -> Dict[ first_seen_positions['*'] = 0 opts_dict, paths = _load_config( defaults, parse_config, merge_dicts, *paths, overrides=overrides, initialize_defaults=init_results_dict) - ans: Dict[str, SSHOptions] = {} phd = get_per_hosts_dict(opts_dict) - for hostname in sorted(phd, key=first_seen_positions.__getitem__): - opts = SSHOptions(phd[hostname]) - opts.config_paths = paths - opts.config_overrides = overrides - ans[hostname] = opts + final_dict: Dict[str, Any] = {} + for hostname_pat in sorted(phd, key=first_seen_positions.__getitem__): + if host_matches(hostname_pat, hostname, username): + od = phd[hostname_pat] + for k, v in od.items(): + if isinstance(v, dict): + bv = final_dict.setdefault(k, {}) + bv.update(v) + else: + final_dict[k] = v first_seen_positions.clear() - return ans + return SSHOptions(final_dict) -def init_config(overrides: Optional[Iterable[str]] = None) -> Dict[str, SSHOptions]: +def init_config(hostname: str, username: str, overrides: Optional[Iterable[str]] = None) -> SSHOptions: config = tuple(resolve_config(SYSTEM_CONF, defconf)) - opts_dict = load_config(*config, overrides=overrides) - return opts_dict + return load_config(*config, overrides=overrides, hostname=hostname, username=username) diff --git a/kittens/ssh/main.py b/kittens/ssh/main.py index b261947b0..c84bbd6dd 100644 --- a/kittens/ssh/main.py +++ b/kittens/ssh/main.py @@ -37,7 +37,7 @@ from kitty.types import run_once from kitty.utils import SSHConnectionData, set_echo as turn_off_echo from .completion import complete, ssh_options -from .config import init_config, options_for_host +from .config import init_config from .copy import CopyInstruction from .options.types import Options as SSHOptions from .options.utils import DELETE_ENV_VAR @@ -557,8 +557,7 @@ def run_ssh(ssh_args: List[str], server_args: List[str], found_extra_args: Tuple overrides.append(aq) if overrides: overrides.insert(0, f'hostname {uname}@{hostname_for_match}') - so = init_config(overrides) - host_opts = options_for_host(hostname_for_match, uname, so) + host_opts = init_config(hostname_for_match, uname, overrides) use_control_master = host_opts.share_connections if use_control_master: cmd[insertion_point:insertion_point] = connection_sharing_args(host_opts, int(os.environ['KITTY_PID'])) diff --git a/kitty_tests/ssh.py b/kitty_tests/ssh.py index b204e058e..e0b4d62be 100644 --- a/kitty_tests/ssh.py +++ b/kitty_tests/ssh.py @@ -8,7 +8,7 @@ import shutil import tempfile from functools import lru_cache -from kittens.ssh.config import load_config, options_for_host +from kittens.ssh.config import load_config from kittens.ssh.main import ( bootstrap_script, get_connection_data, wrap_bootstrap_script ) @@ -59,31 +59,24 @@ print(' '.join(map(str, buf)))'''), lines=13, cols=77) self.assertTrue(runtime_dir()) def test_ssh_config_parsing(self): - def parse(conf): - return load_config(overrides=conf.splitlines()) + def parse(conf, hostname='unmatched_host', username=''): + return load_config(overrides=conf.splitlines(), hostname=hostname, username=username) - def for_host(hostname, conf, username='kitty'): - if isinstance(conf, str): - conf = parse(conf) - return options_for_host(hostname, username, conf) - - self.ae(for_host('x', '').env, {}) - self.ae(for_host('x', 'env a=b').env, {'a': 'b'}) - pc = parse('env a=b\nhostname 2\nenv a=c\nenv b=b') - self.ae(set(pc.keys()), {'*', '2'}) - self.ae(for_host('x', pc).env, {'a': 'b'}) - self.ae(for_host('2', pc).env, {'a': 'c', 'b': 'b'}) - self.ae(for_host('x', 'env a=').env, {'a': ''}) - self.ae(for_host('x', 'env a').env, {'a': '_delete_this_env_var_'}) - pc = parse('env a=b\nhostname test@2\nenv a=c\nenv b=b') - self.ae(set(pc.keys()), {'*', 'test@2'}) - self.ae(for_host('x', pc).env, {'a': 'b'}) - self.ae(for_host('2', pc).env, {'a': 'b'}) - self.ae(for_host('2', pc, 'test').env, {'a': 'c', 'b': 'b'}) - pc = parse('env a=b\nhostname 1 2\nenv a=c\nenv b=b') - self.ae(for_host('x', pc).env, {'a': 'b'}) - self.ae(for_host('1', pc).env, {'a': 'c', 'b': 'b'}) - self.ae(for_host('2', pc).env, {'a': 'c', 'b': 'b'}) + self.ae(parse('').env, {}) + self.ae(parse('env a=b').env, {'a': 'b'}) + conf = 'env a=b\nhostname 2\nenv a=c\nenv b=b' + self.ae(parse(conf).env, {'a': 'b'}) + self.ae(parse(conf, '2').env, {'a': 'c', 'b': 'b'}) + self.ae(parse('env a=').env, {'a': ''}) + self.ae(parse('env a').env, {'a': '_delete_this_env_var_'}) + conf = 'env a=b\nhostname test@2\nenv a=c\nenv b=b' + self.ae(parse(conf).env, {'a': 'b'}) + self.ae(parse(conf, '2').env, {'a': 'b'}) + self.ae(parse(conf, '2', 'test').env, {'a': 'c', 'b': 'b'}) + conf = 'env a=b\nhostname 1 2\nenv a=c\nenv b=b' + self.ae(parse(conf).env, {'a': 'b'}) + self.ae(parse(conf, '1').env, {'a': 'c', 'b': 'b'}) + self.ae(parse(conf, '2').env, {'a': 'c', 'b': 'b'}) @property @lru_cache() @@ -112,7 +105,7 @@ copy --dest=a/sfa simple-file copy --glob g.* copy --exclude */w.* d1 ''' - copy = load_config(overrides=filter(None, conf.splitlines()))['*'].copy + copy = load_config(overrides=filter(None, conf.splitlines())).copy self.check_bootstrap( sh, remote_home, test_script='env; exit 0', SHELL_INTEGRATION_VALUE='', ssh_opts={'copy': copy}