1
1
mirror of https://github.com/eblot/pybootd.git synced 2024-08-17 18:00:33 +03:00

Add MAC and UUID mask support to simplify configuration files

This commit is contained in:
Emmanuel Blot 2020-04-14 16:41:56 +02:00
parent 02955ba712
commit 91f371b24a

View File

@ -29,9 +29,10 @@
from binascii import hexlify
from collections import OrderedDict
from os import stat
from os.path import realpath, join as joinpath
from re import compile as recompile
from re import compile as recompile, sub as resub
from select import select
from socket import (if_nametoindex, inet_aton, inet_ntoa, socket,
AF_INET, SOCK_DGRAM, IPPROTO_UDP, IPPROTO_IP, SOL_SOCKET,
@ -40,10 +41,11 @@ from struct import calcsize as scalc, pack as spack, unpack as sunpack
from sys import platform
from time import sleep
from traceback import format_exc
from typing import Optional
from typing import Optional, Tuple, Union
from urllib.error import HTTPError, URLError
from urllib.parse import urlencode, urlsplit, urlunsplit
from urllib.request import urlopen
from uuid import UUID
from netifaces import ifaddresses, interfaces
from .tftpd import TftpServer
from .util import hexline, to_bool, iptoint, inttoip, get_iface_config
@ -201,7 +203,7 @@ class BootpServer:
Implements bootstrap protocol.
"""
ACCESS_LOCAL = ['uuid', 'mac'] # Access modes, defined locally
ACCESS_LOCAL = {'uuid': 128, 'mac': 48} # Access modes, defined locally
ACCESS_REMOTE = ['http'] # Access modes, remotely retrieved
(ST_IDLE, ST_PXE, ST_DHCP) = range(3) # Current state
@ -249,20 +251,22 @@ class BootpServer:
self.acl = None
else:
access = access.lower()
if access not in self.ACCESS_LOCAL + self.ACCESS_REMOTE:
if access not in list(self.ACCESS_LOCAL) + self.ACCESS_REMOTE:
raise BootpError('Invalid access mode: %s' % access)
if not self.config.has_section(access):
raise BootpError("Missing access section '%s'" % access)
self.acl = {}
self.acl = OrderedDict()
if access in self.ACCESS_LOCAL:
for entry in self.config.options(access):
self.acl[entry.upper().replace('-', ':')] = \
to_bool(self.config.get(access, entry))
self.buggy_clients = set()
acl_builder = getattr(self, 'build_%s_acl' % access)
kent = acl_builder(entry)
self.acl[kent] = to_bool(self.config.get(access, entry))
self.buggy_clients = OrderedDict()
if self.config.options(self.BUGGY_CLIENT_SECTION):
for entry in self.config.options(self.BUGGY_CLIENT_SECTION):
if to_bool(self.config.get(self.BUGGY_CLIENT_SECTION, entry)):
self.buggy_clients.add(entry.upper().replace('-', ':'))
item = self.build_mac_acl(entry)
self.buggy_clients[item] = \
to_bool(self.config.get(self.BUGGY_CLIENT_SECTION, entry))
self.boot_files = dict()
if not self.config.options(self.BOOT_FILE_SECTION):
raise BootpError("Mising '%s' section" % self.BOOT_FILE_SECTION)
@ -277,8 +281,11 @@ class BootpServer:
for mac_str, ip_str in config.items('static_dhcp'):
mac_key = mac_str.upper().replace('-', ':')
self.ippool[mac_key] = ip_str
if access == 'mac' and mac_str not in self.acl:
self.acl[mac_key] = True
mac = int(resub('[-:]', '', mac_str), 16)
mask = (1 << self.ACCESS_LOCAL['mac']) - 1
access_key = (mac, mask)
if access == 'mac' and access_key not in self.acl:
self.acl[access_key] = True
self.access = access
self._resume = False
@ -319,6 +326,70 @@ class BootpServer:
def is_url(path):
return bool(urlsplit(path).scheme)
@classmethod
def build_mac_acl(cls, entry: str) -> Tuple[int, int]:
parts = entry.split('/', 1)
values = []
bitcount = cls.ACCESS_LOCAL['mac']
maxval = (1 << bitcount) - 1
for mask, part in enumerate(parts):
try:
if mask:
value = maxval & ~((1 << int(part)) - 1)
else:
part = resub('[-:]', '', part)
value = int(part, 16)
value <<= bitcount - len(part)*4
if not 0 <= value <= maxval:
raise ValueError()
values.append(value)
except Exception:
raise ValueError('Invalid ACL value: %s' % entry)
if len(values) < 2:
values.append(maxval)
return tuple(values)
@classmethod
def build_uuid_acl(cls, entry: str) -> Tuple[int, int]:
parts = entry.split('/', 1)
values = []
bitcount = cls.ACCESS_LOCAL['uuid']
maxval = (1 << bitcount) - 1
for part in parts:
try:
value = UUID('{%s}' % part).int
if not 0 <= value <= maxval:
raise ValueError()
values.append(value)
except Exception:
raise ValueError('Invalid ACL value: %s' % entry)
if len(values) < 2:
values.append(maxval)
return tuple(values)
@classmethod
def check_acl(cls, acl: dict, access: str, value: Union[bytes, UUID]) \
-> Union[bool, None]:
width = cls.ACCESS_LOCAL[access]
if access == 'mac':
ival = int(hexlify(value), 16)
else:
ival = value.int
access_key = (ival, (1 << width) - 1)
if access_key in acl:
# try direct match
result = acl[access_key]
else:
# find matching filter
result = None
for val, mask in acl:
if ival & mask == val & mask:
result = acl[(val, mask)]
break
if result:
return True
return result
def get_netconfig(self):
return self.netconfig
@ -490,20 +561,21 @@ class BootpServer:
server_addr = self.netconfig['server']
mac_addr = buf[BOOTP_CHADDR][:6]
identifiers = {'mac': mac_addr}
mac_str = ':'.join(['%02X' % x for x in mac_addr])
# is the UUID received (PXE mode)
if 97 in options and len(options[97]) == 17:
uuid = options[97][1:]
uuid = UUID(bytes=options[97][1:])
identifiers['uuid'] = uuid
pxe = True
self.log.debug('PXE UUID has been received')
# or retrieved from the cache (DHCP mode)
else:
uuid = self.uuidpool.get(mac_addr, None)
identifiers['uuid'] = uuid
pxe = False
self.log.debug('PXE UUID not present in request')
uuid_str = (':'.join(hexlify(x).decode() for x in
(uuid[0:4], uuid[4:6], uuid[6:8], uuid[8:10],
uuid[10:16]))).upper() if uuid else None
uuid_str = str(uuid) if uuid else None
if uuid_str:
self.log.info('UUID is %s for MAC %s', uuid_str, mac_str)
@ -554,7 +626,7 @@ class BootpServer:
parameters['uuid'] = uuid_str
if not pxe and mac_str in self.ippool:
parameters['ip'] = self.ippool[mac_str]
item = uuid_str or mac_str
item_str = uuid_str or mac_str
# only bother the authentication host when a state change is
# required.
checkhost = currentstate != newstate
@ -586,26 +658,30 @@ class BootpServer:
self.log.critical('Internal error: %s' % exc)
self.states[mac_str] = self.ST_IDLE
return
# local access is only validated if mac address is not yet known
elif mac_str not in self.ippool:
item = locals()['%s_str' % self.access]
# local access is only validated if mac addr is not yet known
item = identifiers.get(self.access, None)
if not item:
self.log.info('Missing %s identifier, '
'ignoring %s request' %
(self.access, mac_str))
return
if item not in self.acl:
self.log.info('%s is not in ACL list, '
'ignoring %s request' % (item, mac_str))
return
if not self.acl[item]:
self.log.info('%s access is disabled, '
'ignoring %s request' % (item, mac_str))
result = self.check_acl(self.acl, self.access, item)
if uuid:
item_str = '/'.join((uuid_str, mac_str))
else:
item_str = mac_addr
if not result:
if result is not None:
self.log.info('%s access in ACL is disabled', item_str)
else:
self.log.info('%s is not in ACL list', item_str)
return
else:
item = locals()['%s_str' % self.access]
# mac is registered, that is already authorized
item_str = mac_str
self.log.info('%s access is authorized, '
'request will be satisfied' % item)
'request will be satisfied' % item_str)
if 55 in options:
for opt in options[55]:
@ -776,10 +852,12 @@ class BootpServer:
if pxe:
self.uuidpool[mac_addr] = uuid
if mac_str in self.buggy_clients:
self.log.debug('Force global broadcast for buggy client %s',
if self.check_acl(self.buggy_clients, 'mac', mac_addr):
self.log.error('Force global broadcast for buggy client %s',
mac_str)
addr = ('255.255.255.255', addr[1])
else:
self.log.error('Not buggy')
# send the response
sock.sendto(pkt, addr)