diff --git a/pybootd/pxed.py b/pybootd/pxed.py index c1e80b6..796e845 100644 --- a/pybootd/pxed.py +++ b/pybootd/pxed.py @@ -29,9 +29,10 @@ from binascii import hexlify +from collections import OrderedDict from os import stat from os.path import realpath, join as joinpath -from re import compile as recompile +from re import compile as recompile, sub as resub from select import select from socket import (if_nametoindex, inet_aton, inet_ntoa, socket, AF_INET, SOCK_DGRAM, IPPROTO_UDP, IPPROTO_IP, SOL_SOCKET, @@ -40,10 +41,11 @@ from struct import calcsize as scalc, pack as spack, unpack as sunpack from sys import platform from time import sleep from traceback import format_exc -from typing import Optional +from typing import Optional, Tuple, Union from urllib.error import HTTPError, URLError from urllib.parse import urlencode, urlsplit, urlunsplit from urllib.request import urlopen +from uuid import UUID from netifaces import ifaddresses, interfaces from .tftpd import TftpServer from .util import hexline, to_bool, iptoint, inttoip, get_iface_config @@ -201,7 +203,7 @@ class BootpServer: Implements bootstrap protocol. """ - ACCESS_LOCAL = ['uuid', 'mac'] # Access modes, defined locally + ACCESS_LOCAL = {'uuid': 128, 'mac': 48} # Access modes, defined locally ACCESS_REMOTE = ['http'] # Access modes, remotely retrieved (ST_IDLE, ST_PXE, ST_DHCP) = range(3) # Current state @@ -249,20 +251,22 @@ class BootpServer: self.acl = None else: access = access.lower() - if access not in self.ACCESS_LOCAL + self.ACCESS_REMOTE: + if access not in list(self.ACCESS_LOCAL) + self.ACCESS_REMOTE: raise BootpError('Invalid access mode: %s' % access) if not self.config.has_section(access): raise BootpError("Missing access section '%s'" % access) - self.acl = {} + self.acl = OrderedDict() if access in self.ACCESS_LOCAL: for entry in self.config.options(access): - self.acl[entry.upper().replace('-', ':')] = \ - to_bool(self.config.get(access, entry)) - self.buggy_clients = set() + acl_builder = getattr(self, 'build_%s_acl' % access) + kent = acl_builder(entry) + self.acl[kent] = to_bool(self.config.get(access, entry)) + self.buggy_clients = OrderedDict() if self.config.options(self.BUGGY_CLIENT_SECTION): for entry in self.config.options(self.BUGGY_CLIENT_SECTION): - if to_bool(self.config.get(self.BUGGY_CLIENT_SECTION, entry)): - self.buggy_clients.add(entry.upper().replace('-', ':')) + item = self.build_mac_acl(entry) + self.buggy_clients[item] = \ + to_bool(self.config.get(self.BUGGY_CLIENT_SECTION, entry)) self.boot_files = dict() if not self.config.options(self.BOOT_FILE_SECTION): raise BootpError("Mising '%s' section" % self.BOOT_FILE_SECTION) @@ -277,8 +281,11 @@ class BootpServer: for mac_str, ip_str in config.items('static_dhcp'): mac_key = mac_str.upper().replace('-', ':') self.ippool[mac_key] = ip_str - if access == 'mac' and mac_str not in self.acl: - self.acl[mac_key] = True + mac = int(resub('[-:]', '', mac_str), 16) + mask = (1 << self.ACCESS_LOCAL['mac']) - 1 + access_key = (mac, mask) + if access == 'mac' and access_key not in self.acl: + self.acl[access_key] = True self.access = access self._resume = False @@ -319,6 +326,70 @@ class BootpServer: def is_url(path): return bool(urlsplit(path).scheme) + @classmethod + def build_mac_acl(cls, entry: str) -> Tuple[int, int]: + parts = entry.split('/', 1) + values = [] + bitcount = cls.ACCESS_LOCAL['mac'] + maxval = (1 << bitcount) - 1 + for mask, part in enumerate(parts): + try: + if mask: + value = maxval & ~((1 << int(part)) - 1) + else: + part = resub('[-:]', '', part) + value = int(part, 16) + value <<= bitcount - len(part)*4 + if not 0 <= value <= maxval: + raise ValueError() + values.append(value) + except Exception: + raise ValueError('Invalid ACL value: %s' % entry) + if len(values) < 2: + values.append(maxval) + return tuple(values) + + @classmethod + def build_uuid_acl(cls, entry: str) -> Tuple[int, int]: + parts = entry.split('/', 1) + values = [] + bitcount = cls.ACCESS_LOCAL['uuid'] + maxval = (1 << bitcount) - 1 + for part in parts: + try: + value = UUID('{%s}' % part).int + if not 0 <= value <= maxval: + raise ValueError() + values.append(value) + except Exception: + raise ValueError('Invalid ACL value: %s' % entry) + if len(values) < 2: + values.append(maxval) + return tuple(values) + + @classmethod + def check_acl(cls, acl: dict, access: str, value: Union[bytes, UUID]) \ + -> Union[bool, None]: + width = cls.ACCESS_LOCAL[access] + if access == 'mac': + ival = int(hexlify(value), 16) + else: + ival = value.int + access_key = (ival, (1 << width) - 1) + if access_key in acl: + # try direct match + result = acl[access_key] + else: + # find matching filter + result = None + for val, mask in acl: + if ival & mask == val & mask: + result = acl[(val, mask)] + break + if result: + return True + return result + def get_netconfig(self): return self.netconfig @@ -490,20 +561,21 @@ class BootpServer: server_addr = self.netconfig['server'] mac_addr = buf[BOOTP_CHADDR][:6] + identifiers = {'mac': mac_addr} mac_str = ':'.join(['%02X' % x for x in mac_addr]) # is the UUID received (PXE mode) if 97 in options and len(options[97]) == 17: - uuid = options[97][1:] + uuid = UUID(bytes=options[97][1:]) + identifiers['uuid'] = uuid pxe = True self.log.debug('PXE UUID has been received') # or retrieved from the cache (DHCP mode) else: uuid = self.uuidpool.get(mac_addr, None) + identifiers['uuid'] = uuid pxe = False self.log.debug('PXE UUID not present in request') - uuid_str = (':'.join(hexlify(x).decode() for x in - (uuid[0:4], uuid[4:6], uuid[6:8], uuid[8:10], - uuid[10:16]))).upper() if uuid else None + uuid_str = str(uuid) if uuid else None if uuid_str: self.log.info('UUID is %s for MAC %s', uuid_str, mac_str) @@ -554,7 +626,7 @@ class BootpServer: parameters['uuid'] = uuid_str if not pxe and mac_str in self.ippool: parameters['ip'] = self.ippool[mac_str] - item = uuid_str or mac_str + item_str = uuid_str or mac_str # only bother the authentication host when a state change is # required. checkhost = currentstate != newstate @@ -586,26 +658,30 @@ class BootpServer: self.log.critical('Internal error: %s' % exc) self.states[mac_str] = self.ST_IDLE return - # local access is only validated if mac address is not yet known elif mac_str not in self.ippool: - item = locals()['%s_str' % self.access] + # local access is only validated if mac addr is not yet known + item = identifiers.get(self.access, None) if not item: self.log.info('Missing %s identifier, ' 'ignoring %s request' % (self.access, mac_str)) return - if item not in self.acl: - self.log.info('%s is not in ACL list, ' - 'ignoring %s request' % (item, mac_str)) - return - if not self.acl[item]: - self.log.info('%s access is disabled, ' - 'ignoring %s request' % (item, mac_str)) + result = self.check_acl(self.acl, self.access, item) + if uuid: + item_str = '/'.join((uuid_str, mac_str)) + else: + item_str = mac_addr + if not result: + if result is not None: + self.log.info('%s access in ACL is disabled', item_str) + else: + self.log.info('%s is not in ACL list', item_str) return else: - item = locals()['%s_str' % self.access] + # mac is registered, that is already authorized + item_str = mac_str self.log.info('%s access is authorized, ' - 'request will be satisfied' % item) + 'request will be satisfied' % item_str) if 55 in options: for opt in options[55]: @@ -776,10 +852,12 @@ class BootpServer: if pxe: self.uuidpool[mac_addr] = uuid - if mac_str in self.buggy_clients: - self.log.debug('Force global broadcast for buggy client %s', + if self.check_acl(self.buggy_clients, 'mac', mac_addr): + self.log.error('Force global broadcast for buggy client %s', mac_str) addr = ('255.255.255.255', addr[1]) + else: + self.log.error('Not buggy') # send the response sock.sendto(pkt, addr)