Helper: Improve connection switching when toggling interfaces

This commit is contained in:
Dain Nilsson 2024-09-13 12:57:55 +02:00
parent 5ba095777e
commit 3099cfc75e
No known key found for this signature in database
GPG Key ID: F04367096FBA95E8
3 changed files with 40 additions and 17 deletions

View File

@ -202,7 +202,12 @@ class DevicesNode(RpcNode):
def __call__(self, *args, **kwargs): def __call__(self, *args, **kwargs):
with self._get_state: with self._get_state:
try: try:
return super().__call__(*args, **kwargs) response = super().__call__(*args, **kwargs)
if "device_closed" in response.flags:
self._list_state = 0
self._device_mapping = {}
response.flags.remove("device_closed")
return response
except ConnectionException as e: except ConnectionException as e:
if self._failing_connection == e.body: if self._failing_connection == e.body:
self._retries += 1 self._retries += 1
@ -270,6 +275,13 @@ class AbstractDeviceNode(RpcNode):
def __call__(self, *args, **kwargs): def __call__(self, *args, **kwargs):
try: try:
response = super().__call__(*args, **kwargs) response = super().__call__(*args, **kwargs)
# The command resulted in the device closing
if "device_closed" in response.flags:
self.close()
return response
# The command resulted in device_info modification
if "device_info" in response.flags: if "device_info" in response.flags:
old_info = self._info old_info = self._info
# Refresh data # Refresh data
@ -296,7 +308,9 @@ class AbstractDeviceNode(RpcNode):
raise NoSuchNodeException(name) raise NoSuchNodeException(name)
def get_data(self): def get_data(self):
return self._data if self._data:
return self._data
raise ChildResetException("Unable to read device data")
def _refresh_data(self): def _refresh_data(self):
... ...
@ -326,7 +340,12 @@ class UsbDeviceNode(AbstractDeviceNode):
if self._child and not self._child.closed: if self._child and not self._child.closed:
# Make sure to close any open session # Make sure to close any open session
self._child._close_child() self._child._close_child()
return self._read_data(self._child._connection) try:
return self._read_data(self._child._connection)
except Exception:
logger.warning(
f"Unable to use {self._child._connection}", exc_info=True
)
# No child, open new connection # No child, open new connection
for conn_type in (SmartCardConnection, OtpConnection, FidoConnection): for conn_type in (SmartCardConnection, OtpConnection, FidoConnection):
@ -336,7 +355,9 @@ class UsbDeviceNode(AbstractDeviceNode):
return self._read_data(conn) return self._read_data(conn)
except Exception: except Exception:
logger.warning(f"Unable to connect via {conn_type}", exc_info=True) logger.warning(f"Unable to connect via {conn_type}", exc_info=True)
raise ValueError("No supported connections") # Failed to refresh, close
self.close()
return None
@child(condition=lambda self: self._supports_connection(SmartCardConnection)) @child(condition=lambda self: self._supports_connection(SmartCardConnection))
def ccid(self): def ccid(self):

View File

@ -203,7 +203,7 @@ class Ctap2Node(RpcNode):
raise raise
self._info = self.ctap.get_info() self._info = self.ctap.get_info()
self._token = None self._token = None
return RpcResponse(dict(), ["device_info"]) return RpcResponse(dict(), ["device_info", "device_closed"])
@action(condition=lambda self: self._info.options["clientPin"]) @action(condition=lambda self: self._info.options["clientPin"])
def unlock(self, params, event, signal): def unlock(self, params, event, signal):

View File

@ -51,20 +51,20 @@ class ManagementNode(RpcNode):
def _await_reboot(self, serial, usb_enabled): def _await_reboot(self, serial, usb_enabled):
ifaces = CAPABILITY(usb_enabled or 0).usb_interfaces ifaces = CAPABILITY(usb_enabled or 0).usb_interfaces
types: list[Type[Connection]] = [
SmartCardConnection,
OtpConnection,
# mypy doesn't support ABC.register()
FidoConnection, # type: ignore
]
connection_types = [t for t in types if t.usb_interface in ifaces]
# Prefer to use the "same" connection type as before # Prefer to use the "same" connection type as before
if self._connection_type.usb_interface in ifaces: if self._connection_type in connection_types:
connection_types = [self._connection_type] connection_types.remove(self._connection_type)
else: connection_types.insert(0, self._connection_type)
types: list[Type[Connection]] = [
SmartCardConnection,
OtpConnection,
# mypy doesn't support ABC.register()
FidoConnection, # type: ignore
]
connection_types = [t for t in types if t.usb_interface in ifaces]
self.session.close() self.session.close()
logger.debug("Waiting for device to re-appear...") logger.debug(f"Waiting for device to re-appear over {connection_types}...")
for _ in range(10): for _ in range(10):
sleep(0.2) # Always sleep initially sleep(0.2) # Always sleep initially
for dev, info in list_all_devices(connection_types): for dev, info in list_all_devices(connection_types):
@ -87,10 +87,12 @@ class ManagementNode(RpcNode):
) )
serial = self.session.read_device_info().serial serial = self.session.read_device_info().serial
self.session.write_device_config(config, reboot, cur_lock_code, new_lock_code) self.session.write_device_config(config, reboot, cur_lock_code, new_lock_code)
flags = ["device_info"]
if reboot: if reboot:
enabled = config.enabled_capabilities.get(TRANSPORT.USB) enabled = config.enabled_capabilities.get(TRANSPORT.USB)
flags.append("device_closed")
self._await_reboot(serial, enabled) self._await_reboot(serial, enabled)
return RpcResponse(dict(), ["device_info"]) return RpcResponse(dict(), flags)
@action @action
def set_mode(self, params, event, signal): def set_mode(self, params, event, signal):