diff --git a/helper/helper/device.py b/helper/helper/device.py index d5d79f93..174f2f19 100644 --- a/helper/helper/device.py +++ b/helper/helper/device.py @@ -31,18 +31,21 @@ from ykman.base import PID from ykman.device import scan_devices, list_all_devices from ykman.diagnostics import get_diagnostics from ykman.logging import set_log_level -from yubikit.core import TRANSPORT +from yubikit.core import TRANSPORT, NotSupportedError from yubikit.core.smartcard import SmartCardConnection, ApduError, SW +from yubikit.core.smartcard.scp import Scp11KeyParams from yubikit.core.otp import OtpConnection from yubikit.core.fido import FidoConnection from yubikit.support import get_name, read_info from yubikit.management import CAPABILITY +from yubikit.securitydomain import SecurityDomainSession from yubikit.logging import LOG_LEVEL from ykman.pcsc import list_devices, YK_READER_NAME from smartcard.Exceptions import SmartcardException, NoCardException from smartcard.pcsc.PCSCExceptions import EstablishContextException from smartcard.CardMonitoring import CardObserver, CardMonitor +from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey from hashlib import sha256 from dataclasses import asdict from typing import Mapping, Tuple @@ -381,7 +384,7 @@ class ReaderDeviceNode(AbstractDeviceNode): try: connection = self._device.open_connection(SmartCardConnection) info = read_info(connection) - return ConnectionNode(self._device, connection, info) + return ScpConnectionNode(self._device, connection, info) except (ValueError, SmartcardException, EstablishContextException) as e: logger.warning("Error opening connection", exc_info=True) raise ConnectionException(self._device.fingerprint, "ccid", e) @@ -436,33 +439,36 @@ class ConnectionNode(RpcNode): self._info = read_info(self._connection, self._device.pid) return dict(version=self._info.version, serial=self._info.serial) + def _init_child_node(self, child_cls): + return child_cls(self._connection) + @child( condition=lambda self: self._transport == TRANSPORT.USB or isinstance(self._connection, SmartCardConnection) ) def management(self): - return ManagementNode(self._connection) + return self._init_child_node(ManagementNode) @child( condition=lambda self: isinstance(self._connection, SmartCardConnection) and CAPABILITY.OATH in self.capabilities ) def oath(self): - return OathNode(self._connection) + return self._init_child_node(OathNode) @child( condition=lambda self: isinstance(self._connection, SmartCardConnection) and CAPABILITY.PIV in self.capabilities ) def piv(self): - return PivNode(self._connection) + return self._init_child_node(PivNode) @child( condition=lambda self: isinstance(self._connection, FidoConnection) and CAPABILITY.FIDO2 in self.capabilities ) def ctap2(self): - return Ctap2Node(self._connection) + return self._init_child_node(Ctap2Node) @child( condition=lambda self: CAPABILITY.OTP in self.capabilities @@ -479,4 +485,27 @@ class ConnectionNode(RpcNode): ) ) def yubiotp(self): - return YubiOtpNode(self._connection) + return self._init_child_node(YubiOtpNode) + + +class ScpConnectionNode(ConnectionNode): + def __init__(self, device, connection, info): + super().__init__(device, connection, info) + + self.scp_params = None + try: + scp = SecurityDomainSession(connection) + + for ref in scp.get_key_information().keys(): + if ref.kid == 0x13: + chain = scp.get_certificate_bundle(ref) + if chain: + pub_key = chain[-1].public_key() + assert isinstance(pub_key, EllipticCurvePublicKey) # nosec + self.scp_params = Scp11KeyParams(ref, pub_key) + break + except NotSupportedError: + pass + + def _init_child_node(self, child_cls): + return child_cls(self._connection, self.scp_params) diff --git a/helper/helper/management.py b/helper/helper/management.py index 8fc02438..d367b0c1 100644 --- a/helper/helper/management.py +++ b/helper/helper/management.py @@ -28,10 +28,10 @@ logger = logging.getLogger(__name__) class ManagementNode(RpcNode): - def __init__(self, connection): + def __init__(self, connection, scp_params=None): super().__init__() self._connection_type: Type[Connection] = type(connection) - self.session = ManagementSession(connection) + self.session = ManagementSession(connection, scp_params) def get_data(self): try: diff --git a/helper/helper/oath.py b/helper/helper/oath.py index 1f434d19..bb21873c 100644 --- a/helper/helper/oath.py +++ b/helper/helper/oath.py @@ -77,9 +77,9 @@ class OathNode(RpcNode): logger.warning("Failed to unwrap access key", exc_info=True) return None - def __init__(self, connection): + def __init__(self, connection, scp_params=None): super().__init__() - self.session = OathSession(connection) + self.session = OathSession(connection, scp_params) self._key_verifier = None if self.session.locked: diff --git a/helper/helper/piv.py b/helper/helper/piv.py index 46827120..0270990f 100644 --- a/helper/helper/piv.py +++ b/helper/helper/piv.py @@ -91,9 +91,9 @@ def _handle_pin_puk_error(e): class PivNode(RpcNode): - def __init__(self, connection): + def __init__(self, connection, scp_params=None): super().__init__() - self.session = PivSession(connection) + self.session = PivSession(connection, scp_params) self._pivman_data = get_pivman_data(self.session) self._authenticated = False diff --git a/helper/helper/yubiotp.py b/helper/helper/yubiotp.py index 0f9eaf59..83ac55eb 100644 --- a/helper/helper/yubiotp.py +++ b/helper/helper/yubiotp.py @@ -40,9 +40,9 @@ _FAIL_MSG = ( class YubiOtpNode(RpcNode): - def __init__(self, connection): + def __init__(self, connection, scp_params=None): super().__init__() - self.session = YubiOtpSession(connection) + self.session = YubiOtpSession(connection, scp_params) def get_data(self): state = self.session.get_config_state()