More match-statements

This commit is contained in:
Dain Nilsson 2024-10-07 15:17:44 +02:00
parent d932a424f6
commit f8b4eff328
No known key found for this signature in database
GPG Key ID: F04367096FBA95E8
2 changed files with 38 additions and 44 deletions

View File

@ -52,20 +52,21 @@ def _handle_incoming(event, recv, error, cmd_queue):
if not request: if not request:
break break
try: try:
kind = request["kind"] match request["kind"]:
if kind == "signal": case "signal":
# Cancel signals are handled here, the rest forwarded # Cancel signals are handled here, the rest forwarded
if request["status"] == "cancel": if request["status"] == "cancel":
event.set() logger.debug("Got cancel signal!")
else: event.set()
# Ignore other signals else:
logger.error("Unhandled signal: %r", request) # Ignore other signals
elif kind == "command": logger.error("Unhandled signal: %r", request)
cmd_queue.join() # Wait for existing command to complete case "command":
event.clear() # Reset event for next command cmd_queue.join() # Wait for existing command to complete
cmd_queue.put(request) event.clear() # Reset event for next command
else: cmd_queue.put(request)
error("invalid-command", "Unsupported request type") case _:
error("invalid-command", "Unsupported request type")
except KeyError as e: except KeyError as e:
error("invalid-command", str(e)) error("invalid-command", str(e))
except RpcException as e: except RpcException as e:
@ -171,7 +172,10 @@ def run_rpc_socket(sock):
def recv(): def recv():
line = b"" line = b""
while not line.endswith(b"\n"): while not line.endswith(b"\n"):
chunk = sock.recv(1024) try:
chunk = sock.recv(1024)
except ConnectionError:
return None
if not chunk: if not chunk:
return None return None
line += chunk line += chunk

View File

@ -19,12 +19,12 @@ from yubikit.core.otp import modhex_encode, modhex_decode
from yubikit.yubiotp import ( from yubikit.yubiotp import (
YubiOtpSession, YubiOtpSession,
SLOT, SLOT,
SlotConfiguration,
UpdateConfiguration, UpdateConfiguration,
HmacSha1SlotConfiguration, HmacSha1SlotConfiguration,
HotpSlotConfiguration, HotpSlotConfiguration,
StaticPasswordSlotConfiguration, StaticPasswordSlotConfiguration,
YubiOtpSlotConfiguration, YubiOtpSlotConfiguration,
StaticTicketSlotConfiguration,
) )
from ykman.otp import generate_static_pw, format_csv from ykman.otp import generate_static_pw, format_csv
from yubikit.oath import parse_b32_key from yubikit.oath import parse_b32_key
@ -108,15 +108,6 @@ class YubiOtpNode(RpcNode):
) )
_CONFIG_TYPES = dict(
hmac_sha1=HmacSha1SlotConfiguration,
hotp=HotpSlotConfiguration,
static_password=StaticPasswordSlotConfiguration,
yubiotp=YubiOtpSlotConfiguration,
static_ticket=StaticTicketSlotConfiguration,
)
class SlotNode(RpcNode): class SlotNode(RpcNode):
def __init__(self, session, slot): def __init__(self, session, slot):
super().__init__() super().__init__()
@ -169,7 +160,8 @@ class SlotNode(RpcNode):
) )
return dict(response=response) return dict(response=response)
def _apply_options(self, config, options): @staticmethod
def _apply_options(config, options) -> None:
for option in ( for option in (
"serial_api_visible", "serial_api_visible",
"serial_usb_visible", "serial_usb_visible",
@ -199,31 +191,29 @@ class SlotNode(RpcNode):
token_id, *args = options.pop("token_id") token_id, *args = options.pop("token_id")
config.token_id(bytes.fromhex(token_id), *args) config.token_id(bytes.fromhex(token_id), *args)
return config @staticmethod
def _get_config(cfg_type: str, **kwargs) -> SlotConfiguration:
def _get_config(self, type, **kwargs): match cfg_type:
config = None case "hmac_sha1":
return HmacSha1SlotConfiguration(bytes.fromhex(kwargs["key"]))
if type in _CONFIG_TYPES: case "hotp":
if type == "hmac_sha1": return HotpSlotConfiguration(parse_b32_key(kwargs["key"]))
config = _CONFIG_TYPES[type](bytes.fromhex(kwargs["key"])) case "static_password":
elif type == "hotp": return StaticPasswordSlotConfiguration(
config = _CONFIG_TYPES[type](parse_b32_key(kwargs["key"]))
elif type == "static_password":
config = _CONFIG_TYPES[type](
encode( encode(
kwargs["password"], KEYBOARD_LAYOUT[kwargs["keyboard_layout"]] kwargs["password"], KEYBOARD_LAYOUT[kwargs["keyboard_layout"]]
) )
) )
elif type == "yubiotp": case "yubiotp":
config = _CONFIG_TYPES[type]( return YubiOtpSlotConfiguration(
fixed=modhex_decode(kwargs["public_id"]), fixed=modhex_decode(kwargs["public_id"]),
uid=bytes.fromhex(kwargs["private_id"]), uid=bytes.fromhex(kwargs["private_id"]),
key=bytes.fromhex(kwargs["key"]), key=bytes.fromhex(kwargs["key"]),
) )
else: case unsupported:
raise ValueError("No supported configuration type provided.") raise ValueError(
return config f"Unsupported configuration type provided: {unsupported}"
)
@action @action
def put( def put(
@ -252,7 +242,7 @@ class SlotNode(RpcNode):
params, params,
acc_code: str | None = None, acc_code: str | None = None,
curr_acc_code: str | None = None, curr_acc_code: str | None = None,
**kwargs **kwargs,
): ):
config = UpdateConfiguration() config = UpdateConfiguration()
self._apply_options(config, kwargs) self._apply_options(config, kwargs)