mirror of
https://github.com/kovidgoyal/kitty.git
synced 2024-09-21 11:39:57 +03:00
Simplify hostname matching
Now that we load the opts upfront, we can have load_config return the final opts object itself
This commit is contained in:
parent
9b0bd81661
commit
54c5faa12d
@ -11,44 +11,23 @@
|
||||
)
|
||||
from kitty.constants import config_dir
|
||||
|
||||
from .options.types import Options as SSHOptions, defaults, option_names
|
||||
from .options.types import Options as SSHOptions, defaults
|
||||
|
||||
SYSTEM_CONF = '/etc/xdg/kitty/ssh.conf'
|
||||
defconf = os.path.join(config_dir, 'ssh.conf')
|
||||
|
||||
|
||||
def host_matches(pat: str, hostname: str, username: str) -> bool:
|
||||
upat = '*'
|
||||
if '@' in pat:
|
||||
upat, pat = pat.split('@', 1)
|
||||
return fnmatch.fnmatchcase(hostname, pat) and fnmatch.fnmatchcase(username, upat)
|
||||
def host_matches(mpat: str, hostname: str, username: str) -> bool:
|
||||
for pat in mpat.split():
|
||||
upat = '*'
|
||||
if '@' in pat:
|
||||
upat, pat = pat.split('@', 1)
|
||||
if fnmatch.fnmatchcase(hostname, pat) and fnmatch.fnmatchcase(username, upat):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def options_for_host(hostname: str, username: str, per_host_opts: Dict[str, SSHOptions], cli_hostname: str = '', cli_uname: str = '') -> SSHOptions:
|
||||
matches = []
|
||||
for spat, opts in per_host_opts.items():
|
||||
for pat in spat.split():
|
||||
if host_matches(pat, hostname, username) or (cli_hostname and host_matches(pat, cli_hostname, cli_uname)):
|
||||
matches.append(opts)
|
||||
if not matches:
|
||||
return SSHOptions({})
|
||||
base = matches[0]
|
||||
rest = matches[1:]
|
||||
if rest:
|
||||
ans = SSHOptions(base._asdict())
|
||||
for name in option_names:
|
||||
for opts in rest:
|
||||
val = getattr(opts, name)
|
||||
if isinstance(val, dict):
|
||||
getattr(ans, name).update(val)
|
||||
else:
|
||||
setattr(ans, name, val)
|
||||
else:
|
||||
ans = base
|
||||
return ans
|
||||
|
||||
|
||||
def load_config(*paths: str, overrides: Optional[Iterable[str]] = None) -> Dict[str, SSHOptions]:
|
||||
def load_config(*paths: str, overrides: Optional[Iterable[str]] = None, hostname: str = '!', username: str = '') -> SSHOptions:
|
||||
from .options.parse import (
|
||||
create_result_dict, merge_result_dicts, parse_conf_item
|
||||
)
|
||||
@ -75,18 +54,21 @@ def parse_config(lines: Iterable[str]) -> Dict[str, Any]:
|
||||
first_seen_positions['*'] = 0
|
||||
opts_dict, paths = _load_config(
|
||||
defaults, parse_config, merge_dicts, *paths, overrides=overrides, initialize_defaults=init_results_dict)
|
||||
ans: Dict[str, SSHOptions] = {}
|
||||
phd = get_per_hosts_dict(opts_dict)
|
||||
for hostname in sorted(phd, key=first_seen_positions.__getitem__):
|
||||
opts = SSHOptions(phd[hostname])
|
||||
opts.config_paths = paths
|
||||
opts.config_overrides = overrides
|
||||
ans[hostname] = opts
|
||||
final_dict: Dict[str, Any] = {}
|
||||
for hostname_pat in sorted(phd, key=first_seen_positions.__getitem__):
|
||||
if host_matches(hostname_pat, hostname, username):
|
||||
od = phd[hostname_pat]
|
||||
for k, v in od.items():
|
||||
if isinstance(v, dict):
|
||||
bv = final_dict.setdefault(k, {})
|
||||
bv.update(v)
|
||||
else:
|
||||
final_dict[k] = v
|
||||
first_seen_positions.clear()
|
||||
return ans
|
||||
return SSHOptions(final_dict)
|
||||
|
||||
|
||||
def init_config(overrides: Optional[Iterable[str]] = None) -> Dict[str, SSHOptions]:
|
||||
def init_config(hostname: str, username: str, overrides: Optional[Iterable[str]] = None) -> SSHOptions:
|
||||
config = tuple(resolve_config(SYSTEM_CONF, defconf))
|
||||
opts_dict = load_config(*config, overrides=overrides)
|
||||
return opts_dict
|
||||
return load_config(*config, overrides=overrides, hostname=hostname, username=username)
|
||||
|
@ -37,7 +37,7 @@
|
||||
from kitty.utils import SSHConnectionData, set_echo as turn_off_echo
|
||||
|
||||
from .completion import complete, ssh_options
|
||||
from .config import init_config, options_for_host
|
||||
from .config import init_config
|
||||
from .copy import CopyInstruction
|
||||
from .options.types import Options as SSHOptions
|
||||
from .options.utils import DELETE_ENV_VAR
|
||||
@ -557,8 +557,7 @@ def run_ssh(ssh_args: List[str], server_args: List[str], found_extra_args: Tuple
|
||||
overrides.append(aq)
|
||||
if overrides:
|
||||
overrides.insert(0, f'hostname {uname}@{hostname_for_match}')
|
||||
so = init_config(overrides)
|
||||
host_opts = options_for_host(hostname_for_match, uname, so)
|
||||
host_opts = init_config(hostname_for_match, uname, overrides)
|
||||
use_control_master = host_opts.share_connections
|
||||
if use_control_master:
|
||||
cmd[insertion_point:insertion_point] = connection_sharing_args(host_opts, int(os.environ['KITTY_PID']))
|
||||
|
@ -8,7 +8,7 @@
|
||||
import tempfile
|
||||
from functools import lru_cache
|
||||
|
||||
from kittens.ssh.config import load_config, options_for_host
|
||||
from kittens.ssh.config import load_config
|
||||
from kittens.ssh.main import (
|
||||
bootstrap_script, get_connection_data, wrap_bootstrap_script
|
||||
)
|
||||
@ -59,31 +59,24 @@ def t(cmdline, binary='ssh', host='main', port=None, identity_file='', extra_arg
|
||||
self.assertTrue(runtime_dir())
|
||||
|
||||
def test_ssh_config_parsing(self):
|
||||
def parse(conf):
|
||||
return load_config(overrides=conf.splitlines())
|
||||
def parse(conf, hostname='unmatched_host', username=''):
|
||||
return load_config(overrides=conf.splitlines(), hostname=hostname, username=username)
|
||||
|
||||
def for_host(hostname, conf, username='kitty'):
|
||||
if isinstance(conf, str):
|
||||
conf = parse(conf)
|
||||
return options_for_host(hostname, username, conf)
|
||||
|
||||
self.ae(for_host('x', '').env, {})
|
||||
self.ae(for_host('x', 'env a=b').env, {'a': 'b'})
|
||||
pc = parse('env a=b\nhostname 2\nenv a=c\nenv b=b')
|
||||
self.ae(set(pc.keys()), {'*', '2'})
|
||||
self.ae(for_host('x', pc).env, {'a': 'b'})
|
||||
self.ae(for_host('2', pc).env, {'a': 'c', 'b': 'b'})
|
||||
self.ae(for_host('x', 'env a=').env, {'a': ''})
|
||||
self.ae(for_host('x', 'env a').env, {'a': '_delete_this_env_var_'})
|
||||
pc = parse('env a=b\nhostname test@2\nenv a=c\nenv b=b')
|
||||
self.ae(set(pc.keys()), {'*', 'test@2'})
|
||||
self.ae(for_host('x', pc).env, {'a': 'b'})
|
||||
self.ae(for_host('2', pc).env, {'a': 'b'})
|
||||
self.ae(for_host('2', pc, 'test').env, {'a': 'c', 'b': 'b'})
|
||||
pc = parse('env a=b\nhostname 1 2\nenv a=c\nenv b=b')
|
||||
self.ae(for_host('x', pc).env, {'a': 'b'})
|
||||
self.ae(for_host('1', pc).env, {'a': 'c', 'b': 'b'})
|
||||
self.ae(for_host('2', pc).env, {'a': 'c', 'b': 'b'})
|
||||
self.ae(parse('').env, {})
|
||||
self.ae(parse('env a=b').env, {'a': 'b'})
|
||||
conf = 'env a=b\nhostname 2\nenv a=c\nenv b=b'
|
||||
self.ae(parse(conf).env, {'a': 'b'})
|
||||
self.ae(parse(conf, '2').env, {'a': 'c', 'b': 'b'})
|
||||
self.ae(parse('env a=').env, {'a': ''})
|
||||
self.ae(parse('env a').env, {'a': '_delete_this_env_var_'})
|
||||
conf = 'env a=b\nhostname test@2\nenv a=c\nenv b=b'
|
||||
self.ae(parse(conf).env, {'a': 'b'})
|
||||
self.ae(parse(conf, '2').env, {'a': 'b'})
|
||||
self.ae(parse(conf, '2', 'test').env, {'a': 'c', 'b': 'b'})
|
||||
conf = 'env a=b\nhostname 1 2\nenv a=c\nenv b=b'
|
||||
self.ae(parse(conf).env, {'a': 'b'})
|
||||
self.ae(parse(conf, '1').env, {'a': 'c', 'b': 'b'})
|
||||
self.ae(parse(conf, '2').env, {'a': 'c', 'b': 'b'})
|
||||
|
||||
@property
|
||||
@lru_cache()
|
||||
@ -112,7 +105,7 @@ def touch(p):
|
||||
copy --glob g.*
|
||||
copy --exclude */w.* d1
|
||||
'''
|
||||
copy = load_config(overrides=filter(None, conf.splitlines()))['*'].copy
|
||||
copy = load_config(overrides=filter(None, conf.splitlines())).copy
|
||||
self.check_bootstrap(
|
||||
sh, remote_home, test_script='env; exit 0', SHELL_INTEGRATION_VALUE='',
|
||||
ssh_opts={'copy': copy}
|
||||
|
Loading…
Reference in New Issue
Block a user