diff --git a/helper/helper/__init__.py b/helper/helper/__init__.py index 79100229..dc84fa47 100644 --- a/helper/helper/__init__.py +++ b/helper/helper/__init__.py @@ -52,20 +52,21 @@ def _handle_incoming(event, recv, error, cmd_queue): if not request: break try: - kind = request["kind"] - if kind == "signal": - # Cancel signals are handled here, the rest forwarded - if request["status"] == "cancel": - event.set() - else: - # Ignore other signals - logger.error("Unhandled signal: %r", request) - elif kind == "command": - cmd_queue.join() # Wait for existing command to complete - event.clear() # Reset event for next command - cmd_queue.put(request) - else: - error("invalid-command", "Unsupported request type") + match request["kind"]: + case "signal": + # Cancel signals are handled here, the rest forwarded + if request["status"] == "cancel": + logger.debug("Got cancel signal!") + event.set() + else: + # Ignore other signals + logger.error("Unhandled signal: %r", request) + case "command": + cmd_queue.join() # Wait for existing command to complete + event.clear() # Reset event for next command + cmd_queue.put(request) + case _: + error("invalid-command", "Unsupported request type") except KeyError as e: error("invalid-command", str(e)) except RpcException as e: @@ -171,7 +172,10 @@ def run_rpc_socket(sock): def recv(): line = b"" while not line.endswith(b"\n"): - chunk = sock.recv(1024) + try: + chunk = sock.recv(1024) + except ConnectionError: + return None if not chunk: return None line += chunk diff --git a/helper/helper/yubiotp.py b/helper/helper/yubiotp.py index 87ffacd1..feb37bf3 100644 --- a/helper/helper/yubiotp.py +++ b/helper/helper/yubiotp.py @@ -19,12 +19,12 @@ from yubikit.core.otp import modhex_encode, modhex_decode from yubikit.yubiotp import ( YubiOtpSession, SLOT, + SlotConfiguration, UpdateConfiguration, HmacSha1SlotConfiguration, HotpSlotConfiguration, StaticPasswordSlotConfiguration, YubiOtpSlotConfiguration, - StaticTicketSlotConfiguration, ) from ykman.otp import generate_static_pw, format_csv from yubikit.oath import parse_b32_key @@ -108,15 +108,6 @@ class YubiOtpNode(RpcNode): ) -_CONFIG_TYPES = dict( - hmac_sha1=HmacSha1SlotConfiguration, - hotp=HotpSlotConfiguration, - static_password=StaticPasswordSlotConfiguration, - yubiotp=YubiOtpSlotConfiguration, - static_ticket=StaticTicketSlotConfiguration, -) - - class SlotNode(RpcNode): def __init__(self, session, slot): super().__init__() @@ -169,7 +160,8 @@ class SlotNode(RpcNode): ) return dict(response=response) - def _apply_options(self, config, options): + @staticmethod + def _apply_options(config, options) -> None: for option in ( "serial_api_visible", "serial_usb_visible", @@ -199,31 +191,29 @@ class SlotNode(RpcNode): token_id, *args = options.pop("token_id") config.token_id(bytes.fromhex(token_id), *args) - return config - - def _get_config(self, type, **kwargs): - config = None - - if type in _CONFIG_TYPES: - if type == "hmac_sha1": - config = _CONFIG_TYPES[type](bytes.fromhex(kwargs["key"])) - elif type == "hotp": - config = _CONFIG_TYPES[type](parse_b32_key(kwargs["key"])) - elif type == "static_password": - config = _CONFIG_TYPES[type]( + @staticmethod + def _get_config(cfg_type: str, **kwargs) -> SlotConfiguration: + match cfg_type: + case "hmac_sha1": + return HmacSha1SlotConfiguration(bytes.fromhex(kwargs["key"])) + case "hotp": + return HotpSlotConfiguration(parse_b32_key(kwargs["key"])) + case "static_password": + return StaticPasswordSlotConfiguration( encode( kwargs["password"], KEYBOARD_LAYOUT[kwargs["keyboard_layout"]] ) ) - elif type == "yubiotp": - config = _CONFIG_TYPES[type]( + case "yubiotp": + return YubiOtpSlotConfiguration( fixed=modhex_decode(kwargs["public_id"]), uid=bytes.fromhex(kwargs["private_id"]), key=bytes.fromhex(kwargs["key"]), ) - else: - raise ValueError("No supported configuration type provided.") - return config + case unsupported: + raise ValueError( + f"Unsupported configuration type provided: {unsupported}" + ) @action def put( @@ -252,7 +242,7 @@ class SlotNode(RpcNode): params, acc_code: str | None = None, curr_acc_code: str | None = None, - **kwargs + **kwargs, ): config = UpdateConfiguration() self._apply_options(config, kwargs)