RPC: Add "touch" signal to OATH calculate.

This commit is contained in:
Dain Nilsson 2022-04-28 12:56:31 +02:00
parent 0b9fc1b70b
commit d5b13fc044
No known key found for this signature in database
GPG Key ID: F04367096FBA95E8

View File

@ -43,6 +43,7 @@ from yubikit.oath import OathSession, CredentialData, OATH_TYPE, HASH_ALGORITHM
from dataclasses import asdict
from enum import Enum, unique
from time import time
from threading import Timer
import hmac
import os
import logging
@ -280,6 +281,7 @@ class CredentialNode(RpcNode):
self.session = session
self.credential = credential
self.refresh = refresh
self._touch = credential.touch_required
def _require_version(self, major, minor, micro):
try:
@ -291,29 +293,45 @@ class CredentialNode(RpcNode):
def get_info(self):
return asdict(self.credential)
@action
def code(self, params, event, signal):
timestamp = params.pop("timestamp", None)
def _do_with_touch(self, signal, action):
timer = None
try:
start = time()
code = self.session.calculate_code(self.credential, timestamp)
return asdict(code)
if self._touch:
signal("touch")
elif self.credential.oath_type == OATH_TYPE.HOTP:
def on_timeout():
signal("touch")
self._touch = True
timer = Timer(0.5, on_timeout)
timer.start()
return action()
except ApduError as e:
if e.sw == SW.SECURITY_CONDITION_NOT_SATISFIED and time() - start > 5:
raise TimeoutException()
raise
finally:
if timer:
timer.cancel()
@action
def code(self, params, event, signal):
timestamp = params.pop("timestamp", None)
code = self._do_with_touch(
signal, lambda: self.session.calculate_code(self.credential, timestamp)
)
return asdict(code)
@action
def calculate(self, params, event, signal):
challenge = decode_bytes(params.pop("challenge"))
try:
start = time()
response = self.session.calculate(self.credential.id, challenge)
return dict(response=response)
except ApduError as e:
if e.sw == SW.SECURITY_CONDITION_NOT_SATISFIED and time() - start > 5:
raise TimeoutException()
raise
response = self._do_with_touch(
signal, lambda: self.session.calculate(self.credential.id, challenge)
)
return dict(response=response)
@action
def delete(self, params, event, signal):