mirror of
https://github.com/Yubico/yubioath-flutter.git
synced 2024-11-26 10:33:15 +03:00
396 lines
13 KiB
Python
396 lines
13 KiB
Python
# Copyright (c) 2021 Yubico AB
|
|
# All rights reserved.
|
|
#
|
|
# Redistribution and use in source and binary forms, with or
|
|
# without modification, are permitted provided that the following
|
|
# conditions are met:
|
|
#
|
|
# 1. Redistributions of source code must retain the above copyright
|
|
# notice, this list of conditions and the following disclaimer.
|
|
# 2. Redistributions in binary form must reproduce the above
|
|
# copyright notice, this list of conditions and the following
|
|
# disclaimer in the documentation and/or other materials provided
|
|
# with the distribution.
|
|
#
|
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
|
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
|
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
|
|
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
|
|
# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
|
|
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
|
|
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
|
|
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
|
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
|
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
|
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
# POSSIBILITY OF SUCH DAMAGE.
|
|
|
|
|
|
from .base import (
|
|
RpcNode,
|
|
action,
|
|
child,
|
|
RpcException,
|
|
TimeoutException,
|
|
AuthRequiredException,
|
|
)
|
|
from fido2.ctap import CtapError
|
|
from fido2.ctap2 import Ctap2, ClientPin
|
|
from fido2.ctap2.credman import CredentialManagement
|
|
from fido2.ctap2.bio import BioEnrollment, FPBioEnrollment, CaptureError
|
|
from fido2.pcsc import CtapPcscDevice
|
|
from yubikit.core.fido import FidoConnection
|
|
from ykman.hid import list_ctap_devices as list_ctap
|
|
from ykman.pcsc import list_devices as list_ccid
|
|
from smartcard.Exceptions import NoCardException, CardConnectionException
|
|
|
|
from dataclasses import asdict
|
|
from time import sleep
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class PinValidationException(RpcException):
|
|
def __init__(self, retries, auth_blocked):
|
|
super().__init__(
|
|
"pin-validation",
|
|
"Authentication is required",
|
|
dict(retries=retries, auth_blocked=auth_blocked),
|
|
)
|
|
|
|
|
|
def _ctap_id(ctap):
|
|
return (ctap.info.aaguid, ctap.info.firmware_version)
|
|
|
|
|
|
def _handle_pin_error(e, client_pin):
|
|
if e.code in (
|
|
CtapError.ERR.PIN_INVALID,
|
|
CtapError.ERR.PIN_BLOCKED,
|
|
CtapError.ERR.PIN_AUTH_BLOCKED,
|
|
):
|
|
pin_retries, _ = client_pin.get_pin_retries()
|
|
raise PinValidationException(
|
|
pin_retries, e.code == CtapError.ERR.PIN_AUTH_BLOCKED
|
|
)
|
|
raise e
|
|
|
|
|
|
class Ctap2Node(RpcNode):
|
|
def __init__(self, connection):
|
|
super().__init__()
|
|
self.ctap = Ctap2(connection)
|
|
self._info = self.ctap.info
|
|
self.client_pin = ClientPin(self.ctap)
|
|
self._auth_blocked = False
|
|
|
|
def get_data(self):
|
|
self._info = self.ctap.get_info()
|
|
logger.debug(f"Info: {self._info}")
|
|
data = dict(info=asdict(self._info), auth_blocked=self._auth_blocked)
|
|
if self._info.options.get("clientPin"):
|
|
pin_retries, power_cycle = self.client_pin.get_pin_retries()
|
|
data.update(
|
|
pin_retries=pin_retries,
|
|
power_cycle=power_cycle,
|
|
)
|
|
if self._info.options.get("bioEnroll"):
|
|
uv_retries = self.client_pin.get_uv_retries()
|
|
# For compatibility with python-fido2 < 1.0
|
|
if isinstance(uv_retries, tuple):
|
|
uv_retries = uv_retries[0]
|
|
data.update(uv_retries=uv_retries)
|
|
return data
|
|
|
|
def _prepare_reset_nfc(self, event, signal):
|
|
reader_name = self.ctap.device._name
|
|
devices = list_ccid(reader_name)
|
|
if not devices or devices[0].reader.name != reader_name:
|
|
raise ValueError("Unable to isolate NFC reader")
|
|
dev = devices[0]
|
|
logger.debug(f"Reset over NFC using reader: {dev.reader.name}")
|
|
|
|
signal("reset", dict(state="remove"))
|
|
removed = False
|
|
while not event.wait(0.5):
|
|
try:
|
|
with dev.open_connection(FidoConnection):
|
|
if removed:
|
|
sleep(1.0) # Wait for the device to settle
|
|
return dev.open_connection(FidoConnection)
|
|
except CardConnectionException:
|
|
pass # Expected, ignore
|
|
except NoCardException:
|
|
if not removed:
|
|
signal("reset", dict(state="insert"))
|
|
removed = True
|
|
|
|
raise TimeoutException()
|
|
|
|
def _prepare_reset_usb(self, event, signal):
|
|
dev_path = self.ctap.device.descriptor.path
|
|
logger.debug(f"Reset over USB: {dev_path}")
|
|
|
|
signal("reset", dict(state="remove"))
|
|
removed_state = None
|
|
while not event.wait(0.5):
|
|
keys = list_ctap()
|
|
present = {k.descriptor.path for k in keys}
|
|
if removed_state is None:
|
|
if dev_path not in present:
|
|
signal("reset", dict(state="insert"))
|
|
removed_state = present
|
|
else:
|
|
added = present - removed_state
|
|
if len(added) == 1:
|
|
dev_path = next(iter(added)) # Path may have changed
|
|
key = next(k for k in keys if k.descriptor.path == dev_path)
|
|
connection = key.open_connection(FidoConnection)
|
|
signal("reset", dict(state="touch"))
|
|
return connection
|
|
elif len(added) > 1:
|
|
raise ValueError("Multiple YubiKeys inserted")
|
|
|
|
raise TimeoutException()
|
|
|
|
@action
|
|
def reset(self, params, event, signal):
|
|
target = _ctap_id(self.ctap)
|
|
if isinstance(self.ctap.device, CtapPcscDevice):
|
|
connection = self._prepare_reset_nfc(event, signal)
|
|
else:
|
|
connection = self._prepare_reset_usb(event, signal)
|
|
|
|
logger.debug("Performing reset...")
|
|
self.ctap = Ctap2(connection)
|
|
if target != _ctap_id(self.ctap):
|
|
raise ValueError("Re-inserted YubiKey does not match initial device")
|
|
self.ctap.reset(event)
|
|
self._info = self.ctap.get_info()
|
|
self._auth_blocked = False
|
|
return dict()
|
|
|
|
@action(condition=lambda self: self._info.options["clientPin"])
|
|
def verify_pin(self, params, event, signal):
|
|
pin = params.pop("pin")
|
|
try:
|
|
self.client_pin.get_pin_token(
|
|
pin, ClientPin.PERMISSION.GET_ASSERTION, "ykman.example.com"
|
|
)
|
|
return dict()
|
|
except CtapError as e:
|
|
return _handle_pin_error(e, self.client_pin)
|
|
|
|
@action
|
|
def set_pin(self, params, event, signal):
|
|
has_pin = self.ctap.get_info().options["clientPin"]
|
|
try:
|
|
if has_pin:
|
|
self.client_pin.change_pin(
|
|
params.pop("pin"),
|
|
params.pop("new_pin"),
|
|
)
|
|
else:
|
|
self.client_pin.set_pin(
|
|
params.pop("new_pin"),
|
|
)
|
|
return dict()
|
|
except CtapError as e:
|
|
return _handle_pin_error(e, self.client_pin)
|
|
|
|
@child(condition=lambda self: BioEnrollment.is_supported(self._info))
|
|
def fingerprints(self):
|
|
return FingerprintsNode(self.client_pin)
|
|
|
|
@child(condition=lambda self: CredentialManagement.is_supported(self._info))
|
|
def credentials(self):
|
|
return CredentialsRpsNode(self.client_pin)
|
|
token = self.client_pin.get_pin_token(
|
|
self._pin, ClientPin.PERMISSION.CREDENTIAL_MGMT
|
|
)
|
|
creds = CredentialManagement(self.ctap, self.client_pin.protocol, token)
|
|
return CredentialsRpsNode(creds)
|
|
|
|
|
|
class CredentialsRpsNode(RpcNode):
|
|
def __init__(self, client_pin):
|
|
super().__init__()
|
|
self.client_pin = client_pin
|
|
self.credman = None
|
|
self._rps = {}
|
|
|
|
def get_data(self):
|
|
return dict(locked=self.credman is None)
|
|
|
|
@action
|
|
def unlock(self, params, event, signal):
|
|
pin = params.pop("pin")
|
|
try:
|
|
token = self.client_pin.get_pin_token(
|
|
pin, ClientPin.PERMISSION.CREDENTIAL_MGMT
|
|
)
|
|
self.credman = CredentialManagement(
|
|
self.client_pin.ctap, self.client_pin.protocol, token
|
|
)
|
|
self.refresh()
|
|
return dict()
|
|
except CtapError as e:
|
|
return _handle_pin_error(e, self.client_pin)
|
|
|
|
def refresh(self):
|
|
data = self.credman.get_metadata()
|
|
if data.get(CredentialManagement.RESULT.EXISTING_CRED_COUNT) == 0:
|
|
self._rps = {}
|
|
else:
|
|
self._rps = {
|
|
rp[CredentialManagement.RESULT.RP]["id"]: dict(
|
|
rp_id=rp[CredentialManagement.RESULT.RP]["id"],
|
|
rp_id_hash=rp[CredentialManagement.RESULT.RP_ID_HASH],
|
|
)
|
|
for rp in self.credman.enumerate_rps()
|
|
}
|
|
|
|
def list_children(self):
|
|
return self._rps
|
|
|
|
def create_child(self, name):
|
|
if name in self._rps:
|
|
return CredentialsRpNode(self.credman, self._rps[name], self.refresh)
|
|
return super().create_child(name)
|
|
|
|
|
|
class CredentialsRpNode(RpcNode):
|
|
def __init__(self, credman, rp_data, refresh):
|
|
super().__init__()
|
|
self.credman = credman
|
|
self.data = rp_data
|
|
self.refresh_rps = refresh
|
|
self.refresh()
|
|
|
|
def refresh(self):
|
|
self.refresh_rps()
|
|
self._creds = {
|
|
cred[CredentialManagement.RESULT.CREDENTIAL_ID]["id"].hex(): dict(
|
|
credential_id=cred[CredentialManagement.RESULT.CREDENTIAL_ID],
|
|
user_id=cred[CredentialManagement.RESULT.USER]["id"],
|
|
user_name=cred[CredentialManagement.RESULT.USER]["name"],
|
|
)
|
|
for cred in self.credman.enumerate_creds(self.data["rp_id_hash"])
|
|
}
|
|
|
|
def list_children(self):
|
|
return self._creds
|
|
|
|
def create_child(self, name):
|
|
if name in self._creds:
|
|
return CredentialNode(
|
|
self.credman,
|
|
self._creds[name],
|
|
self.refresh,
|
|
)
|
|
return super().create_child(name)
|
|
|
|
|
|
class CredentialNode(RpcNode):
|
|
def __init__(self, credman, credential_data, refresh):
|
|
super().__init__()
|
|
self.credman = credman
|
|
self.data = credential_data
|
|
self.refresh = refresh
|
|
|
|
def get_data(self):
|
|
return self.data
|
|
|
|
@action
|
|
def delete(self, params, event, signal):
|
|
self.credman.delete_cred(self.data["credential_id"])
|
|
self.refresh()
|
|
|
|
|
|
class FingerprintsNode(RpcNode):
|
|
def __init__(self, client_pin):
|
|
super().__init__()
|
|
self.client_pin = client_pin
|
|
self.bio = None
|
|
self._templates = {}
|
|
|
|
def get_data(self):
|
|
return dict(locked=self.bio is None)
|
|
|
|
@action
|
|
def unlock(self, params, event, signal):
|
|
pin = params.pop("pin")
|
|
try:
|
|
token = self.client_pin.get_pin_token(pin, ClientPin.PERMISSION.BIO_ENROLL)
|
|
self.bio = FPBioEnrollment(
|
|
self.client_pin.ctap, self.client_pin.protocol, token
|
|
)
|
|
self.refresh()
|
|
return dict()
|
|
except CtapError as e:
|
|
return _handle_pin_error(e, self.client_pin)
|
|
|
|
def refresh(self):
|
|
self._templates = {
|
|
# Treat empty strings as None
|
|
k: v if v else None
|
|
for k, v in self.bio.enumerate_enrollments().items()
|
|
}
|
|
|
|
def list_children(self):
|
|
return {
|
|
template_id.hex(): dict(name=name)
|
|
for template_id, name in self._templates.items()
|
|
}
|
|
|
|
def create_child(self, name):
|
|
template_id = bytes.fromhex(name)
|
|
if template_id in self._templates:
|
|
return FingerprintNode(
|
|
self.bio, template_id, self._templates[template_id], self.refresh
|
|
)
|
|
return super().create_child(name)
|
|
|
|
@action
|
|
def add(self, params, event, signal):
|
|
if self.bio is None:
|
|
raise AuthRequiredException()
|
|
name = params.get("name", None)
|
|
enroller = self.bio.enroll()
|
|
template_id = None
|
|
while template_id is None:
|
|
try:
|
|
template_id = enroller.capture(event)
|
|
signal("capture", dict(remaining=enroller.remaining))
|
|
except CaptureError as e:
|
|
signal("capture-error", dict(code=e.code))
|
|
if name:
|
|
self.bio.set_name(template_id, name)
|
|
self._templates[template_id] = name
|
|
return dict(template_id=template_id, name=name)
|
|
|
|
|
|
class FingerprintNode(RpcNode):
|
|
def __init__(self, bio, template_id, name, refresh):
|
|
super().__init__()
|
|
self.bio = bio
|
|
self.refresh = refresh
|
|
self.template_id = template_id
|
|
self.name = name
|
|
|
|
def get_data(self):
|
|
return dict(template_id=self.template_id, name=self.name)
|
|
|
|
@action
|
|
def rename(self, params, event, signal):
|
|
name = params.pop("name")
|
|
self.bio.set_name(self.template_id, name)
|
|
self.name = name
|
|
self.refresh()
|
|
|
|
@action
|
|
def delete(self, params, event, signal):
|
|
self.bio.remove_enrollment(self.template_id)
|
|
self.refresh()
|