yubioath-flutter/ykman-rpc/rpc/device.py
2022-03-11 13:31:26 +01:00

362 lines
12 KiB
Python

# Copyright (c) 2021 Yubico AB
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
from .base import RpcNode, child, action, NoSuchNodeException, ChildResetException
from .oath import OathNode
from .fido import Ctap2Node
from .yubiotp import YubiOtpNode
from .management import ManagementNode
from .qr import scan_qr
from ykman import __version__ as ykman_version
from ykman.base import PID
from ykman.device import (
scan_devices,
list_all_devices,
get_name,
read_info,
)
from ykman.diagnostics import get_diagnostics
from ykman.logging import set_log_level
from yubikit.core import TRANSPORT
from yubikit.core.smartcard import SmartCardConnection, ApduError, SW
from yubikit.core.otp import OtpConnection
from yubikit.core.fido import FidoConnection
from yubikit.management import CAPABILITY
from yubikit.logging import LOG_LEVEL
from ykman.pcsc import list_devices, YK_READER_NAME
from smartcard.Exceptions import SmartcardException
from dataclasses import asdict
from typing import Mapping, Tuple
import os
import logging
logger = logging.getLogger(__name__)
class RootNode(RpcNode):
def __init__(self):
super().__init__()
self._devices = DevicesNode()
self._readers = ReadersNode()
def __call__(self, *args):
result = super().__call__(*args)
if result is None:
result = {}
return result
def get_child(self, name):
self._child = self.create_child(name)
self._child_name = name
return self._child
def get_data(self):
return dict(version=ykman_version)
@child
def usb(self):
return self._devices
@child
def nfc(self):
return self._readers
@action
def diagnose(self, *ignored):
return dict(diagnostics=get_diagnostics())
@action(closes_child=False)
def logging(self, params, event, signal):
level = LOG_LEVEL[params["level"].upper()]
set_log_level(level)
logger.info(f"Log level set to: {level.name}")
return dict()
@action(closes_child=False)
def qr(self, params, event, signal):
return dict(result=scan_qr())
class ReadersNode(RpcNode):
def __init__(self):
super().__init__()
self._state = set()
self._readers = {}
self._reader_mapping = {}
@action(closes_child=False)
def scan(self, *ignored):
return self.list_children()
def list_children(self):
devices = [
d for d in list_devices("") if YK_READER_NAME not in d.reader.name.lower()
]
state = {d.reader.name for d in devices}
if self._state != state:
self._readers = {}
self._reader_mapping = {}
for device in devices:
dev_id = os.urandom(4).hex()
self._reader_mapping[dev_id] = device
self._readers[dev_id] = dict(name=device.reader.name)
self._state = state
return self._readers
def create_child(self, name):
return ReaderDeviceNode(self._reader_mapping[name], None)
class _ScanDevices:
def __init__(self):
self._state: Tuple[Mapping[PID, int], int] = ({}, 0)
self._caching = False
def __call__(self):
if not self._caching or not self._state[1]:
self._state = scan_devices()
return self._state
def __enter__(self):
self._caching = True
self._state = ({}, 0)
def __exit__(self, exc_type, exc, exc_tb):
self._caching = False
class DevicesNode(RpcNode):
def __init__(self):
super().__init__()
self._get_state = _ScanDevices()
self._list_state = 0
self._devices = {}
self._device_mapping = {}
def __call__(self, *args, **kwargs):
with self._get_state:
return super().__call__(*args, **kwargs)
@action(closes_child=False)
def scan(self, *ignored):
return self.get_data()
def get_data(self):
state = self._get_state()
return dict(state=state[1], pids=state[0])
def list_children(self):
state = self._get_state()
if state[1] != self._list_state:
logger.debug(f"State changed (was={self._list_state}, now={state[1]})")
self._devices = {}
self._device_mapping = {}
for dev, info in list_all_devices():
dev_id = str(info.serial) if info.serial else os.urandom(4).hex()
while dev_id in self._device_mapping:
dev_id = os.urandom(4).hex()
self._device_mapping[dev_id] = (dev, info)
name = get_name(info, dev.pid.get_type() if dev.pid else None)
self._devices[dev_id] = dict(pid=dev.pid, name=name, serial=info.serial)
if sum(state[0].values()) == len(self._devices):
self._list_state = state[1]
logger.debug("State updated: {state[1]}")
else:
logger.warning("Not all devices identified")
self._list_state = 0
return self._devices
def create_child(self, name):
return UsbDeviceNode(*self._device_mapping[name])
class AbstractDeviceNode(RpcNode):
def __init__(self, device, info):
super().__init__()
self._device = device
self._info = info
def __call__(self, *args, **kwargs):
try:
return super().__call__(*args, **kwargs)
except (SmartcardException, OSError) as e:
logger.error("Device error", exc_info=e)
self._child = None
name = self._child_name
self._child_name = None
raise NoSuchNodeException(name)
def create_child(self, name):
try:
return super().create_child(name)
except (SmartcardException, OSError):
logger.error(f"Unable to create child {name}", exc_info=True)
raise NoSuchNodeException(name)
def get_data(self):
for conn_type in (SmartCardConnection, OtpConnection, FidoConnection):
if self._device.supports_connection(conn_type):
try:
with self._device.open_connection(conn_type) as conn:
pid = self._device.pid
self._info = read_info(pid, conn)
name = get_name(self._info, pid.get_type() if pid else None)
return dict(
pid=pid,
name=name,
transport=self._device.transport,
info=asdict(self._info),
)
except Exception:
logger.error(f"Unable to connect via {conn_type}", exc_info=True)
raise ValueError("No supported connections")
class UsbDeviceNode(AbstractDeviceNode):
def __init__(self, device, info):
super().__init__(device, info)
def _supports_connection(self, conn_type):
return self._device.supports_connection(conn_type)
def _create_connection(self, conn_type):
connection = self._device.open_connection(conn_type)
return ConnectionNode(self._device, connection, self._info)
@child(condition=lambda self: self._supports_connection(SmartCardConnection))
def ccid(self):
return self._create_connection(SmartCardConnection)
@child(condition=lambda self: self._supports_connection(OtpConnection))
def otp(self):
return self._create_connection(OtpConnection)
@child(condition=lambda self: self._supports_connection(FidoConnection))
def fido(self):
return self._create_connection(FidoConnection)
class ReaderDeviceNode(AbstractDeviceNode):
def get_data(self):
try:
return super().get_data() | dict(present=True)
except Exception:
return dict(present=False)
@child
def ccid(self):
connection = self._device.open_connection(SmartCardConnection)
info = read_info(None, connection)
return ConnectionNode(self._device, connection, info)
@child
def fido(self):
with self._device.open_connection(SmartCardConnection) as conn:
info = read_info(None, conn)
connection = self._device.open_connection(FidoConnection)
return ConnectionNode(self._device, connection, info)
class ConnectionNode(RpcNode):
def __init__(self, device, connection, info):
super().__init__()
self._device = device
self._transport = device.transport
self._connection = connection
self._info = info or read_info(device.pid, self._connection)
def __call__(self, *args, **kwargs):
try:
return super().__call__(*args, **kwargs)
except (SmartcardException, OSError) as e:
logger.error("Connection error", exc_info=e)
raise ChildResetException(f"{e}")
except ApduError as e:
if e.sw == SW.INVALID_INSTRUCTION:
raise ChildResetException(f"SW: {e.sw}")
raise e
@property
def capabilities(self):
return self._info.config.enabled_capabilities[self._transport]
def close(self):
super().close()
try:
self._connection.close()
except Exception as e:
logger.warning("Error closing connection", exc_info=e)
def get_data(self):
if (
isinstance(self._connection, SmartCardConnection)
or self._transport == TRANSPORT.USB
):
self._info = read_info(self._device.pid, self._connection)
return dict(version=self._info.version, serial=self._info.serial)
@child(
condition=lambda self: self._transport == TRANSPORT.USB
or isinstance(self._connection, SmartCardConnection)
)
def management(self):
return ManagementNode(self._connection)
@child(
condition=lambda self: isinstance(self._connection, SmartCardConnection)
and CAPABILITY.OATH in self.capabilities
)
def oath(self):
return OathNode(self._connection)
@child(
condition=lambda self: isinstance(self._connection, FidoConnection)
and CAPABILITY.FIDO2 in self.capabilities
)
def ctap2(self):
return Ctap2Node(self._connection)
@child(
condition=lambda self: CAPABILITY.OTP in self.capabilities
and (
isinstance(self._connection, OtpConnection)
or ( # SmartCardConnection can be used over NFC, or on 5.3 and later.
isinstance(self._connection, SmartCardConnection)
and (
self._transport == TRANSPORT.NFC or self._info.version >= (5, 3, 0)
)
)
)
)
def yubiotp(self):
return YubiOtpNode(self._connection)