Skip to content

Commit

Permalink
Add support for hostname canonicalization
Browse files Browse the repository at this point in the history
This commit adds support for hostname canonicalization. It supports the
same config items used in OpenSSH config files, but also provides
options which can be passed directly, without the need to use a config
file. This commit also adds automatic reloading of the config file when
canonicalization is performed, as well as support for match keywords
"canonical" and "final".

This commit also adds support for negation of match keywords. Thanks go
to GitHub user commonism who suggested adding this support and provided
a proposed implementation.
  • Loading branch information
ronf committed Nov 24, 2024
1 parent 32a9fe1 commit b99e8d5
Show file tree
Hide file tree
Showing 7 changed files with 493 additions and 61 deletions.
118 changes: 86 additions & 32 deletions asyncssh/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,15 @@ class SSHConfig:
_percent_expand = {'AuthorizedKeysFile'}
_handlers: Dict[str, Tuple[str, Callable]] = {}

def __init__(self, last_config: Optional['SSHConfig'], reload: bool):
def __init__(self, last_config: Optional['SSHConfig'], reload: bool,
canonical: bool, final: bool):
if last_config:
self._last_options = last_config.get_options(reload)
else:
self._last_options = {}

self._canonical = canonical
self._final = True if final else None
self._default_path = Path('~', '.ssh').expanduser()
self._path = Path()
self._line_no = 0
Expand Down Expand Up @@ -153,35 +156,53 @@ def _match(self, option: str, args: List[str]) -> None:

# pylint: disable=unused-argument

matching = True

while args:
match = args.pop(0).lower()

if match[0] == '!':
match = match[1:]
negated = True
else:
negated = False

if match == 'final' and self._final is None:
self._final = False

if match == 'all':
self._matching = True
continue
result = True
elif match == 'canonical':
result = self._canonical
elif match == 'final':
result = cast(bool, self._final)
else:
match_val = self._match_val(match)

match_val = self._match_val(match)
if match != 'exec' and match_val is None:
self._error(f'Invalid match condition {match}')

if match != 'exec' and match_val is None:
self._error('Invalid match condition')
try:
arg = args.pop(0)
except IndexError:
self._error(f'Missing {match} match pattern')

if matching:
if match == 'exec':
result = _exec(arg)
elif match in ('address', 'localaddress'):
host_pat = HostPatternList(arg)
ip = ip_address(cast(str, match_val)) \
if match_val else None
result = host_pat.matches(None, match_val, ip)
else:
wild_pat = WildcardPatternList(arg)
result = wild_pat.matches(match_val)

try:
if match == 'exec':
self._matching = _exec(args.pop(0))
elif match in ('address', 'localaddress'):
host_pat = HostPatternList(args.pop(0))
ip = ip_address(cast(str, match_val)) \
if match_val else None
self._matching = host_pat.matches(None, match_val, ip)
else:
wild_pat = WildcardPatternList(args.pop(0))
self._matching = wild_pat.matches(match_val)
except IndexError:
self._error(f'Missing {match} match pattern')
if matching and result == negated:
matching = False

if not self._matching:
args.clear()
break
self._matching = matching

def _set_bool(self, option: str, args: List[str]) -> None:
"""Set a boolean config option"""
Expand Down Expand Up @@ -276,6 +297,23 @@ def _set_address_family(self, option: str, args: List[str]) -> None:
if option not in self._options:
self._options[option] = value

def _set_canonicalize_host(self, option: str, args: List[str]) -> None:
"""Set a canonicalize host config option"""

value_str = args.pop(0).lower()

if value_str in ('yes', 'true'):
value: Union[bool, str] = True
elif value_str in ('no', 'false'):
value = False
elif value_str == 'always':
value = value_str
else:
self._error(f'Invalid {option} value: {value_str}')

if option not in self._options:
self._options[option] = value

def _set_rekey_limits(self, option: str, args: List[str]) -> None:
"""Set rekey limits config option"""

Expand All @@ -295,6 +333,11 @@ def _set_rekey_limits(self, option: str, args: List[str]) -> None:
if option not in self._options:
self._options[option] = byte_limit, time_limit

def has_match_final(self) -> bool:
"""Return whether this config includes a 'Match final' block"""

return self._final is not None

def parse(self, path: Path) -> None:
"""Parse an OpenSSH config file and return matching declarations"""

Expand Down Expand Up @@ -384,10 +427,10 @@ def get_options(self, reload: bool) -> Dict[str, object]:
@classmethod
def load(cls, last_config: Optional['SSHConfig'],
config_paths: ConfigPaths, reload: bool,
*args: object) -> 'SSHConfig':
canonical: bool, final: bool, *args: object) -> 'SSHConfig':
"""Load a list of OpenSSH config files into a config object"""

config = cls(last_config, reload, *args)
config = cls(last_config, reload, canonical, final, *args)

if config_paths:
if isinstance(config_paths, (str, PurePath)):
Expand Down Expand Up @@ -429,8 +472,9 @@ class SSHClientConfig(SSHConfig):
'IdentityFile', 'ProxyCommand', 'RemoteCommand'}

def __init__(self, last_config: 'SSHConfig', reload: bool,
local_user: str, user: str, host: str, port: int) -> None:
super().__init__(last_config, reload)
canonical: bool, final: bool, local_user: str,
user: str, host: str, port: int) -> None:
super().__init__(last_config, reload, canonical, final)

self._local_user = local_user
self._orig_host = host
Expand Down Expand Up @@ -485,10 +529,10 @@ def _set_request_tty(self, option: str, args: List[str]) -> None:
value: Union[bool, str] = True
elif value_str in ('no', 'false'):
value = False
elif value_str not in ('force', 'auto'):
self._error(f'Invalid {option} value: {value_str}')
else:
elif value_str in ('force', 'auto'):
value = value_str
else:
self._error(f'Invalid {option} value: {value_str}')

if option not in self._options:
self._options[option] = value
Expand Down Expand Up @@ -531,6 +575,11 @@ def _set_tokens(self) -> None:

('AddressFamily', SSHConfig._set_address_family),
('BindAddress', SSHConfig._set_string),
('CanonicalDomains', SSHConfig._set_string_list),
('CanonicalizeFallbackLocal', SSHConfig._set_bool),
('CanonicalizeHostname', SSHConfig._set_canonicalize_host),
('CanonicalizeMaxDots', SSHConfig._set_int),
('CanonicalizePermittedCNAMEs', SSHConfig._set_string_list),
('CASignatureAlgorithms', SSHConfig._set_string),
('CertificateFile', SSHConfig._append_string),
('ChallengeResponseAuthentication', SSHConfig._set_bool),
Expand Down Expand Up @@ -579,9 +628,9 @@ class SSHServerConfig(SSHConfig):
"""Settings from an OpenSSH server config file"""

def __init__(self, last_config: 'SSHConfig', reload: bool,
local_addr: str, local_port: int, user: str,
host: str, addr: str) -> None:
super().__init__(last_config, reload)
canonical: bool, final: bool, local_addr: str,
local_port: int, user: str, host: str, addr: str) -> None:
super().__init__(last_config, reload, canonical, final)

self._local_addr = local_addr
self._local_port = local_port
Expand Down Expand Up @@ -618,6 +667,11 @@ def _set_tokens(self) -> None:
('AuthorizedKeysFile', SSHConfig._set_string_list),
('AllowAgentForwarding', SSHConfig._set_bool),
('BindAddress', SSHConfig._set_string),
('CanonicalDomains', SSHConfig._set_string_list),
('CanonicalizeFallbackLocal', SSHConfig._set_bool),
('CanonicalizeHostname', SSHConfig._set_canonicalize_host),
('CanonicalizeMaxDots', SSHConfig._set_int),
('CanonicalizePermittedCNAMEs', SSHConfig._set_string_list),
('CASignatureAlgorithms', SSHConfig._set_string),
('ChallengeResponseAuthentication', SSHConfig._set_bool),
('Ciphers', SSHConfig._set_string),
Expand Down
Loading

0 comments on commit b99e8d5

Please sign in to comment.