yubioath-flutter/py/yubikey.py

220 lines
8.1 KiB
Python
Raw Normal View History

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
import types
2017-02-03 14:24:41 +03:00
import re
2017-02-15 12:13:57 +03:00
from base64 import b32decode, b64decode
2017-02-08 13:55:53 +03:00
from binascii import a2b_hex, b2a_hex
from ykman.descriptor import get_descriptors
2017-02-28 13:24:56 +03:00
from ykman.util import CAPABILITY, TRANSPORT, derive_key, parse_uri, parse_b32_key
2017-02-23 12:40:24 +03:00
from ykman.driver_otp import YkpersError
2017-02-01 18:27:45 +03:00
from ykman.oath import OathController, Credential
2017-03-03 16:22:54 +03:00
from qr import qrparse
from qr import qrdecode
NON_FEATURE_CAPABILITIES = [CAPABILITY.CCID, CAPABILITY.NFC]
def as_json(f):
def wrapped(*args):
return json.dumps(f(*(json.loads(a) for a in args)))
return wrapped
class Controller(object):
_descriptor = None
_dev_info = None
def __init__(self):
# Wrap all args and return values as JSON.
for f in dir(self):
if not f.startswith('_'):
func = getattr(self, f)
if isinstance(func, types.MethodType):
setattr(self, f, as_json(func))
def get_features(self):
return [c.name for c in CAPABILITY if c not in NON_FEATURE_CAPABILITIES]
def count_devices(self):
return len(list(get_descriptors()))
def refresh(self):
descriptors = list(get_descriptors())
if len(descriptors) != 1:
self._descriptor = None
return
desc = descriptors[0]
if desc.fingerprint != (self._descriptor.fingerprint if self._descriptor else None):
dev = desc.open_device()
if not dev:
return
2017-01-30 16:59:58 +03:00
self._descriptor = desc
self._dev_info = {
'name': dev.device_name,
'version': '.'.join(str(x) for x in dev.version),
'serial': dev.serial or '',
'enabled': [c.name for c in CAPABILITY if c & dev.enabled],
2017-01-30 16:59:58 +03:00
'connections': [t.name for t in TRANSPORT if t & dev.capabilities],
}
return self._dev_info
2017-02-08 15:07:51 +03:00
def refresh_credentials(self, timestamp, password_key=None):
return [c.to_dict() for c in self._calculate_all(timestamp, password_key)]
2017-02-01 18:27:45 +03:00
2017-02-08 13:55:53 +03:00
def calculate(self, credential, timestamp, password_key):
2017-02-09 14:45:04 +03:00
return self._calculate(Credential.from_dict(credential), timestamp, password_key).to_dict()
def calculate_slot_mode(self, slot, digits, timestamp):
dev = self._descriptor.open_device(TRANSPORT.OTP)
code = dev.driver.calculate(slot, challenge=timestamp, totp=True, digits=int(digits), wait_for_touch=True)
return Credential(self._slot_name(slot), code=code, oath_type='totp', touch=True, algo='SHA1', expiration=self._expiration(timestamp)).to_dict()
2017-02-23 12:40:24 +03:00
def refresh_slot_credentials(self, slots, digits, timestamp):
result = []
if slots[0]:
cred = self._read_slot_cred(1, digits[0], timestamp)
if cred:
result.append(cred)
if slots[1]:
cred = self._read_slot_cred(2, digits[1], timestamp)
if cred:
result.append(cred)
return [c.to_dict() for c in result]
def _read_slot_cred(self, slot, digits, timestamp):
try:
2017-02-28 12:57:10 +03:00
dev = self._descriptor.open_device(TRANSPORT.OTP)
2017-02-23 12:40:24 +03:00
code = dev.driver.calculate(slot, challenge=timestamp, totp=True, digits=int(digits), wait_for_touch=False)
return Credential(self._slot_name(slot), code=code, oath_type='totp', touch=False, algo='SHA1', expiration=self._expiration(timestamp))
2017-02-23 12:40:24 +03:00
except YkpersError as e:
if e.errno == 11:
return Credential(self._slot_name(slot), oath_type='totp', touch=True, algo='SHA1')
2017-02-28 12:57:10 +03:00
except:
pass
2017-02-23 12:40:24 +03:00
return None
def _slot_name(self, slot):
return "YubiKey Slot {}".format(slot)
def _expiration(self, timestamp):
return ((timestamp + 30) // 30) * 30
2017-02-07 16:17:54 +03:00
def needs_validation(self):
2017-02-28 12:57:10 +03:00
try:
dev = self._descriptor.open_device(TRANSPORT.CCID)
controller = OathController(dev.driver)
return controller.locked
except:
return False
2017-02-07 16:17:54 +03:00
2017-02-28 17:07:43 +03:00
2017-03-01 13:57:40 +03:00
def get_oath_id(self):
dev = self._descriptor.open_device(TRANSPORT.CCID)
controller = OathController(dev.driver)
return b2a_hex(controller.id).decode('utf-8')
2017-02-28 17:07:43 +03:00
def derive_key(self, password):
dev = self._descriptor.open_device(TRANSPORT.CCID)
controller = OathController(dev.driver)
key = derive_key(controller.id, password)
return b2a_hex(key).decode('utf-8')
def validate(self, key):
2017-02-07 16:17:54 +03:00
dev = self._descriptor.open_device(TRANSPORT.CCID)
controller = OathController(dev.driver)
2017-02-28 17:07:43 +03:00
if key is not None:
2017-02-16 17:22:20 +03:00
try:
2017-02-28 17:07:43 +03:00
controller.validate(a2b_hex(key))
return True
2017-02-16 17:22:20 +03:00
except:
return False
2017-02-07 16:17:54 +03:00
2017-02-10 16:01:49 +03:00
def set_password(self, new_password, password_key):
dev = self._descriptor.open_device(TRANSPORT.CCID)
controller = OathController(dev.driver)
if controller.locked and password_key is not None:
controller.validate(a2b_hex(password_key))
if new_password is not None:
key = derive_key(controller.id, new_password)
controller.set_password(key)
else:
controller.clear_password()
2017-02-08 15:07:51 +03:00
def add_credential(self, name, key, oath_type, digits, algo, touch, password_key):
2017-02-07 16:17:54 +03:00
dev = self._descriptor.open_device(TRANSPORT.CCID)
controller = OathController(dev.driver)
2017-02-08 15:07:51 +03:00
if controller.locked and password_key is not None:
controller.validate(a2b_hex(password_key))
2017-02-16 16:14:14 +03:00
try:
2017-02-28 13:24:56 +03:00
key = parse_b32_key(key)
2017-02-16 16:14:14 +03:00
except Exception as e:
return str(e)
2017-02-03 14:24:41 +03:00
controller.put(key, name, oath_type, digits, algo=algo, require_touch=touch)
2017-02-27 16:13:54 +03:00
def add_slot_credential(self, slot, key, touch):
dev = self._descriptor.open_device(TRANSPORT.OTP)
2017-02-28 13:24:56 +03:00
key = parse_b32_key(key)
2017-02-27 16:13:54 +03:00
if len(key) > 64: # Keys longer than 64 bytes are hashed.
key = hashlib.sha1(key).digest()
if len(key) > 20:
raise ValueError('YubiKey Slots cannot handle TOTP keys over 20 bytes.')
key += b'\x00' * (20 - len(key)) # Keys must be padded to 20 bytes.
code = dev.driver.program_chalresp(int(slot), key, touch)
2017-02-16 16:14:14 +03:00
2017-02-23 18:00:17 +03:00
def delete_slot_credential(self, slot):
dev = self._descriptor.open_device(TRANSPORT.OTP)
dev.driver.zap_slot(slot)
2017-02-08 15:07:51 +03:00
def delete_credential(self, credential, password_key):
2017-02-03 18:23:28 +03:00
dev = self._descriptor.open_device(TRANSPORT.CCID)
controller = OathController(dev.driver)
2017-02-08 15:07:51 +03:00
if controller.locked and password_key is not None:
controller.validate(a2b_hex(password_key))
2017-02-03 18:23:28 +03:00
controller.delete(Credential.from_dict(credential))
2017-02-08 13:55:53 +03:00
def _calculate(self, credential, timestamp, password_key):
2017-02-01 18:27:45 +03:00
dev = self._descriptor.open_device(TRANSPORT.CCID)
controller = OathController(dev.driver)
2017-02-08 13:55:53 +03:00
if controller.locked and password_key is not None:
2017-03-01 16:36:07 +03:00
controller.validate(a2b_hex(password_key))
2017-02-01 18:27:45 +03:00
cred = controller.calculate(credential, timestamp)
return cred
2017-02-08 13:55:53 +03:00
def _calculate_all(self, timestamp, password_key):
2017-01-30 16:59:58 +03:00
dev = self._descriptor.open_device(TRANSPORT.CCID)
controller = OathController(dev.driver)
2017-02-08 13:55:53 +03:00
if controller.locked and password_key is not None:
2017-03-01 16:36:07 +03:00
controller.validate(a2b_hex(password_key))
creds = controller.calculate_all(timestamp)
2017-01-30 16:59:58 +03:00
creds = [c for c in creds if not c.hidden]
return creds
def parse_qr(self, screenshot):
data = b64decode(screenshot['data'])
image = PixelImage(data, screenshot['width'], screenshot['height'])
for qr in qrparse.parse_qr_codes(image, 2):
return parse_uri(qrdecode.decode_qr_data(qr))
2017-02-15 12:13:57 +03:00
2017-02-17 10:52:34 +03:00
def reset(self):
dev = self._descriptor.open_device(TRANSPORT.CCID)
controller = OathController(dev.driver)
controller.reset()
2017-02-15 12:13:57 +03:00
class PixelImage(object):
def __init__(self, data, width, height):
self.data = data
self.width = width
self.height = height
def get_line(self, line_number):
return self.data[self.width * line_number:self.width * (line_number + 1)]
controller = Controller()