mirror of
https://github.com/Yubico/yubioath-flutter.git
synced 2024-12-30 05:35:22 +03:00
125 lines
4.9 KiB
Python
125 lines
4.9 KiB
Python
# Copyright (c) 2021 Yubico AB
|
|
# All rights reserved.
|
|
#
|
|
# Redistribution and use in source and binary forms, with or
|
|
# without modification, are permitted provided that the following
|
|
# conditions are met:
|
|
#
|
|
# 1. Redistributions of source code must retain the above copyright
|
|
# notice, this list of conditions and the following disclaimer.
|
|
# 2. Redistributions in binary form must reproduce the above
|
|
# copyright notice, this list of conditions and the following
|
|
# disclaimer in the documentation and/or other materials provided
|
|
# with the distribution.
|
|
#
|
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
|
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
|
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
|
|
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
|
|
# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
|
|
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
|
|
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
|
|
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
|
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
|
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
|
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
# POSSIBILITY OF SUCH DAMAGE.
|
|
|
|
|
|
from .base import RpcNode, action
|
|
from yubikit.core import require_version, NotSupportedError, TRANSPORT
|
|
from yubikit.core.smartcard import SmartCardConnection
|
|
from yubikit.core.otp import OtpConnection
|
|
from yubikit.core.fido import FidoConnection
|
|
from yubikit.management import ManagementSession, DeviceConfig, Mode, USB_INTERFACE
|
|
from ykman.device import connect_to_device
|
|
from dataclasses import asdict
|
|
from time import sleep
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ManagementNode(RpcNode):
|
|
def __init__(self, connection):
|
|
super().__init__()
|
|
self._connection_type = type(connection)
|
|
self.session = ManagementSession(connection)
|
|
|
|
def get_data(self):
|
|
try:
|
|
return asdict(self.session.read_device_info())
|
|
except NotSupportedError:
|
|
return {}
|
|
|
|
def list_actions(self):
|
|
actions = super().list_actions()
|
|
try:
|
|
require_version(self.session.version, (5, 0, 0))
|
|
actions.remove("set_mode")
|
|
except NotSupportedError:
|
|
actions.remove("configure")
|
|
return actions
|
|
|
|
def _await_reboot(self, serial, usb_enabled):
|
|
# TODO: Clean up once "support" is merged into ykman.
|
|
iface = USB_INTERFACE.for_capabilities(usb_enabled)
|
|
connection_types = []
|
|
|
|
# Prefer to use the "same" connection type as before
|
|
if iface.supports_connection(self._connection_type):
|
|
if issubclass(self._connection_type, SmartCardConnection):
|
|
connection_types = [SmartCardConnection]
|
|
elif issubclass(self._connection_type, OtpConnection):
|
|
connection_types = [OtpConnection]
|
|
elif issubclass(self._connection_type, FidoConnection):
|
|
connection_types = [FidoConnection]
|
|
|
|
# Allow any expected connection type
|
|
if not connection_types:
|
|
connection_types = [
|
|
t
|
|
for t in [SmartCardConnection, OtpConnection, FidoConnection]
|
|
if iface.supports_connection(t)
|
|
]
|
|
|
|
self.session.close()
|
|
logger.debug("Waiting for device to re-appear...")
|
|
for _ in range(10):
|
|
sleep(0.2) # Always sleep initially
|
|
try:
|
|
conn = connect_to_device(serial, connection_types)[0]
|
|
conn.close()
|
|
break
|
|
except ValueError:
|
|
logger.debug("Not found, sleep...")
|
|
else:
|
|
logger.warning("Timed out waiting for device")
|
|
|
|
@action
|
|
def configure(self, params, event, signal):
|
|
reboot = params.pop("reboot", False)
|
|
cur_lock_code = bytes.fromhex(params.pop("cur_lock_code", "")) or None
|
|
new_lock_code = bytes.fromhex(params.pop("new_lock_code", "")) or None
|
|
config = DeviceConfig(
|
|
params.pop("enabled_capabilities", {}),
|
|
params.pop("auto_eject_timeout", None),
|
|
params.pop("challenge_response_timeout", None),
|
|
params.pop("device_flags", None),
|
|
)
|
|
serial = self.session.read_device_info().serial
|
|
self.session.write_device_config(config, reboot, cur_lock_code, new_lock_code)
|
|
if reboot:
|
|
enabled = config.enabled_capabilities.get(TRANSPORT.USB)
|
|
self._await_reboot(serial, enabled)
|
|
return dict()
|
|
|
|
@action
|
|
def set_mode(self, params, event, signal):
|
|
self.session.set_mode(
|
|
Mode.from_code(params["mode"]),
|
|
params.pop("challenge_response_timeout", 0),
|
|
params.pop("auto_eject_timeout", 0),
|
|
)
|
|
return dict()
|