RPC: Support management for YK <4.

This commit is contained in:
Dain Nilsson 2022-03-14 10:52:33 +01:00
parent a48079864c
commit 30d3dff644
No known key found for this signature in database
GPG Key ID: F04367096FBA95E8
2 changed files with 26 additions and 4 deletions

View File

@ -31,7 +31,7 @@ from yubikit.core import require_version, NotSupportedError, TRANSPORT
from yubikit.core.smartcard import SmartCardConnection
from yubikit.core.otp import OtpConnection
from yubikit.core.fido import FidoConnection
from yubikit.management import ManagementSession, DeviceConfig, USB_INTERFACE
from yubikit.management import ManagementSession, DeviceConfig, Mode, USB_INTERFACE
from ykman.device import connect_to_device
from dataclasses import asdict
from time import sleep
@ -47,7 +47,10 @@ class ManagementNode(RpcNode):
self.session = ManagementSession(connection)
def get_data(self):
try:
return asdict(self.session.read_device_info())
except NotSupportedError:
return {}
def list_actions(self):
actions = super().list_actions()
@ -114,7 +117,7 @@ class ManagementNode(RpcNode):
@action
def set_mode(self, params, event, signal):
self.session.set_mode(
params.pop("mode"),
Mode.from_code(params["mode"]),
params.pop("challenge_response_timeout", 0),
params.pop("auto_eject_timeout", 0),
)

View File

@ -7,6 +7,7 @@ import subprocess # nosec
import sys
import logging
from threading import Thread
from typing import IO, cast
logger = logging.getLogger(__name__)
@ -108,7 +109,7 @@ class RpcShell(cmd.Cmd):
return result
elif kind == "error":
status = result["status"]
print(red(f"{status.upper()}: {result['body']}"))
print(red(f"{status.upper()}: {result['message']}"))
else:
print(red(f"Invalid response: {result}"))
@ -209,6 +210,21 @@ class RpcShell(cmd.Cmd):
return True
def log_stderr(stderr):
while True:
line = stderr.readline()
if not line:
break
try:
record = json.loads(line)
print(red(f"{record['level']} {record['name']}: {record['message']}"))
exc_text = record.get("exc_text")
if exc_text:
print(red(exc_text))
except Exception:
logger.exception(f"Failed to parse error: {line}")
@click.command()
@click.argument("executable", nargs=-1)
def shell(executable):
@ -217,9 +233,12 @@ def shell(executable):
executable or [sys.executable, "ykman-rpc.py"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding="utf8",
)
Thread(daemon=True, target=log_stderr, args=(rpc.stderr,)).start()
click.echo("Shell starting...")
shell = RpcShell(rpc.stdin, cast(IO[str], rpc.stdout))
shell.cmdloop()