Add helper utility function for OTP.

This commit is contained in:
Elias Bonnici 2023-11-09 13:39:58 +01:00
parent b63fb5ba9e
commit 06372442db
No known key found for this signature in database
GPG Key ID: 5EAC28EA3F980CCF

View File

@ -14,7 +14,8 @@
from .base import RpcNode, action, child
from yubikit.core import NotSupportedError
from yubikit.core import NotSupportedError, CommandError
from yubikit.core.otp import modhex_encode, modhex_decode
from yubikit.yubiotp import (
YubiOtpSession,
SLOT,
@ -25,8 +26,17 @@ from yubikit.yubiotp import (
YubiOtpSlotConfiguration,
StaticTicketSlotConfiguration,
)
from typing import Dict
from ykman.otp import generate_static_pw
from yubikit.oath import parse_b32_key
from ykman.scancodes import KEYBOARD_LAYOUT, encode
from typing import Dict
import struct
_FAIL_MSG = (
"Failed to write to the YubiKey. Make sure the device does not "
"have restricted access"
)
class YubiOtpNode(RpcNode):
def __init__(self, connection):
@ -65,6 +75,23 @@ class YubiOtpNode(RpcNode):
def two(self):
return SlotNode(self.session, SLOT.TWO)
@action(closes_child=False)
def serial_modhex(self, params, event, signal):
serial = params["serial"]
return dict(
encoded=modhex_encode(b"\xff\x00" + struct.pack(b">I", serial))
)
@action(closes_child=False)
def generate_static(self, params, event, signal):
layout, length = params["layout"], int(params["length"])
return dict(
password=generate_static_pw(length, KEYBOARD_LAYOUT[layout])
)
@action(closes_child=False)
def keyboard_layouts(self, params, event, signal):
return {layout.name:[sc for sc in layout.value] for layout in KEYBOARD_LAYOUT}
_CONFIG_TYPES = dict(
hmac_sha1=HmacSha1SlotConfiguration,
@ -113,7 +140,10 @@ class SlotNode(RpcNode):
@action(condition=lambda self: self._maybe_configured(self.slot))
def delete(self, params, event, signal):
self.session.delete_slot(self.slot, params.pop("cur_acc_code", None))
try:
self.session.delete_slot(self.slot, params.pop("cur_acc_code", None))
except CommandError:
raise ValueError(_FAIL_MSG)
@action(condition=lambda self: self._can_calculate(self.slot))
def calculate(self, params, event, signal):
@ -121,7 +151,7 @@ class SlotNode(RpcNode):
response = self.session.calculate_hmac_sha1(self.slot, challenge, event)
return dict(response=response)
def _apply_config(self, config, params):
def _apply_options(self, config, options):
for option in (
"serial_api_visible",
"serial_usb_visible",
@ -140,39 +170,63 @@ class SlotNode(RpcNode):
"short_ticket",
"manual_update",
):
if option in params:
getattr(config, option)(params.pop(option))
if option in options:
getattr(config, option)(options.pop(option))
for option in ("tabs", "delay", "pacing", "strong_password"):
if option in params:
getattr(config, option)(*params.pop(option))
if option in options:
getattr(config, option)(*options.pop(option))
if "token_id" in params:
token_id, *args = params.pop("token_id")
if "token_id" in options:
token_id, *args = options.pop("token_id")
config.token_id(bytes.fromhex(token_id), *args)
return config
def _get_config(self, type, **kwargs):
config = None
if type in _CONFIG_TYPES:
if type == "hmac_sha1":
config = _CONFIG_TYPES[type](
bytes.fromhex(kwargs["key"])
)
elif type == "hotp":
config = _CONFIG_TYPES[type](
parse_b32_key(kwargs["key"])
)
elif type == "static_password":
config = _CONFIG_TYPES[type](
encode(kwargs["password"], KEYBOARD_LAYOUT[kwargs["keyboard_layout"]])
)
elif type == "yubiotp":
config = _CONFIG_TYPES[type](
fixed=modhex_decode(kwargs["public_id"]),
uid=bytes.fromhex(kwargs["private_id"]),
key=bytes.fromhex(kwargs["key"])
)
else:
raise ValueError("No supported configuration type provided.")
return config
@action
def put(self, params, event, signal):
config = None
for key in _CONFIG_TYPES:
if key in params:
if config is not None:
raise ValueError("Only one configuration type can be provided.")
config = _CONFIG_TYPES[key](
*(bytes.fromhex(arg) for arg in params.pop(key))
)
if config is None:
raise ValueError("No supported configuration type provided.")
self._apply_config(config, params)
self.session.put_configuration(
self.slot,
config,
params.pop("acc_code", None),
params.pop("cur_acc_code", None),
)
return dict()
type = params.pop("type")
options = params.pop("options", {})
args = params
config = self._get_config(type, **args)
self._apply_options(config, options)
try:
self.session.put_configuration(
self.slot,
config,
params.pop("acc_code", None),
params.pop("cur_acc_code", None),
)
return dict()
except CommandError:
raise ValueError(_FAIL_MSG)
@action(
condition=lambda self: self._state.version >= (2, 2, 0)
@ -180,7 +234,7 @@ class SlotNode(RpcNode):
)
def update(self, params, event, signal):
config = UpdateConfiguration()
self._apply_config(config, params)
self._apply_options(config, params)
self.session.update_configuration(
self.slot,
config,
@ -188,3 +242,4 @@ class SlotNode(RpcNode):
params.pop("cur_acc_code", None),
)
return dict()