1
1
mirror of https://github.com/eblot/pybootd.git synced 2024-09-11 22:17:44 +03:00

Fix several type error issues

* bytes vs. int
 * string vs. bytes
This commit is contained in:
Emmanuel Blot 2019-09-05 11:06:14 +02:00
parent 02c32b412c
commit 23b8327ff1
3 changed files with 33 additions and 33 deletions

View File

@ -29,6 +29,8 @@ from .pxed import BootpServer
from .tftpd import TftpServer from .tftpd import TftpServer
from .util import logger_factory, EasyConfigParser from .util import logger_factory, EasyConfigParser
#pybootd: disable-msg=broad-except
class BootpDaemon(Thread): class BootpDaemon(Thread):

View File

@ -26,8 +26,14 @@ from socket import (inet_aton, inet_ntoa, socket,
from struct import calcsize as scalc, pack as spack, unpack as sunpack from struct import calcsize as scalc, pack as spack, unpack as sunpack
from time import sleep from time import sleep
from traceback import format_exc from traceback import format_exc
from urllib.error import HTTPError, URLError
from urllib.parse import urlencode, urlunsplit
from urllib.request import urlopen
from .util import hexline, to_bool, iptoint, inttoip, get_iface_config from .util import hexline, to_bool, iptoint, inttoip, get_iface_config
#pybootd: disable-msg=broad-except
BOOTP_PORT_REQUEST = 67 BOOTP_PORT_REQUEST = 67
BOOTP_PORT_REPLY = 68 BOOTP_PORT_REPLY = 68
@ -269,13 +275,13 @@ class BootpServer:
self.log.debug('Parsing DHCP options') self.log.debug('Parsing DHCP options')
dhcp_tags = {} dhcp_tags = {}
while tail: while tail:
tag = ord(tail[0]) tag = tail[0]
# padding # padding
if tag == 0: if tag == 0:
continue continue
if tag == 0xff: if tag == 0xff:
return dhcp_tags return dhcp_tags
length = ord(tail[1]) length = tail[1]
(value, ) = sunpack('!%ss' % length, tail[2:2+length]) (value, ) = sunpack('!%ss' % length, tail[2:2+length])
tail = tail[2+length:] tail = tail[2+length:]
try: try:
@ -289,40 +295,38 @@ class BootpServer:
dhcp_tags[tag] = value dhcp_tags[tag] = value
def build_pxe_options(self, options, server): def build_pxe_options(self, options, server):
buf = b''
try: try:
buf = ''
uuid = options[97] uuid = options[97]
buf += spack('!BB%ds' % len(uuid), buf += spack('!BB%ds' % len(uuid),
97, len(uuid), uuid) 97, len(uuid), uuid)
clientclass = options[60] clientclass = options[60]
clientclass = clientclass[:clientclass.find(':')] clientclass = clientclass[:clientclass.find(':')]
buf += spack('!BB%ds' % len(clientclass), buf += spack('!BB%ds' % len(clientclass),
60, len(clientclass), clientclass) 60, len(clientclass), clientclass)
vendor = '' vendor = ''
vendor += spack('!BBB', PXE_DISCOVERY_CONTROL, 1, 0x0A) vendor += spack('!BBB', PXE_DISCOVERY_CONTROL, 1, 0x0A)
vendor += spack('!BBHB4s', PXE_BOOT_SERVERS, 2+1+4, vendor += spack('!BBHB4s', PXE_BOOT_SERVERS, 2+1+4,
0, 1, server) 0, 1, server)
srvstr = 'Python' srvstr = 'Python'
vendor += spack('!BBHB%ds' % len(srvstr), PXE_BOOT_MENU, vendor += spack('!BBHB%ds' % len(srvstr), PXE_BOOT_MENU,
2+1+len(srvstr), 0, len(srvstr), srvstr) 2+1+len(srvstr), 0, len(srvstr), srvstr)
prompt = 'Stupid PXE' prompt = 'Stupid PXE'
vendor += spack('!BBB%ds' % len(prompt), PXE_MENU_PROMPT, vendor += spack('!BBB%ds' % len(prompt), PXE_MENU_PROMPT,
1+len(prompt), len(prompt), prompt) 1+len(prompt), len(prompt), prompt)
buf += spack('!BB%ds' % len(vendor), 43, buf += spack('!BB%ds' % len(vendor), 43,
len(vendor), vendor) len(vendor), vendor)
buf += spack('!BBB', 255, 0, 0) buf += spack('!BBB', 255, 0, 0)
return buf return buf
except KeyError as exc: except KeyError as exc:
self.log.error('Missing options, cancelling: %s' % exc) self.log.error('Missing options, cancelling: %s' % exc)
return None return b''
def build_dhcp_options(self, clientname): def build_dhcp_options(self, clientname):
buf = ''
if not clientname: if not clientname:
return buf return b''
buf += spack('!BB%ds' % len(clientname), return spack('!BB%ds' % len(clientname),
12, len(clientname), clientname) 12, len(clientname), clientname)
return buf
def handle(self, sock, addr, data): def handle(self, sock, addr, data):
self.log.info('Sender: %s on socket %s' % (addr, sock.getsockname())) self.log.info('Sender: %s on socket %s' % (addr, sock.getsockname()))
@ -340,13 +344,13 @@ class BootpServer:
# Extras (DHCP options) # Extras (DHCP options)
try: try:
dhcp_msg_type = ord(options[53][0]) dhcp_msg_type = options[53][0]
except KeyError: except KeyError:
dhcp_msg_type = None dhcp_msg_type = None
server_addr = self.netconfig['server'] server_addr = self.netconfig['server']
mac_addr = buf[BOOTP_CHADDR][:6] mac_addr = buf[BOOTP_CHADDR][:6]
mac_str = ':'.join(['%02X' % ord(x) for x in mac_addr]) mac_str = ':'.join(['%02X' % x for x in mac_addr])
# is the UUID received (PXE mode) # is the UUID received (PXE mode)
if 97 in options and len(options[97]) == 17: if 97 in options and len(options[97]) == 17:
uuid = options[97][1:] uuid = options[97][1:]
@ -401,8 +405,6 @@ class BootpServer:
# remote access is always validated on each request # remote access is always validated on each request
if self.access in self.ACCESS_REMOTE: if self.access in self.ACCESS_REMOTE:
# need to query a host to grant or reject access # need to query a host to grant or reject access
import urlparse
import urllib
netloc = self.config.get(self.access, 'location') netloc = self.config.get(self.access, 'location')
path = self.config.get(self.access, pxe and 'pxe' or 'dhcp') path = self.config.get(self.access, pxe and 'pxe' or 'dhcp')
timeout = int(self.config.get(self.access, 'timeout', '5')) timeout = int(self.config.get(self.access, 'timeout', '5'))
@ -419,14 +421,12 @@ class BootpServer:
if to_bool(always_check): if to_bool(always_check):
checkhost = True checkhost = True
if checkhost: if checkhost:
query = urllib.urlencode(parameters) query = urlencode(parameters)
urlparts = (self.access, netloc, path, query, '') urlparts = (self.access, netloc, path, query, '')
url = urlparse.urlunsplit(urlparts) url = urlunsplit(urlparts)
self.log.info('Requesting URL: %s' % url) self.log.info('Requesting URL: %s' % url)
import urllib2
import httplib
try: try:
up = urllib2.urlopen(url, timeout=timeout) up = urlopen(url, timeout=timeout)
for l in up: for l in up:
try: try:
# Look for extra definition within the reply # Look for extra definition within the reply
@ -438,18 +438,14 @@ class BootpServer:
filename = v filename = v
except ValueError: except ValueError:
pass pass
except urllib2.HTTPError as exc: except HTTPError as exc:
self.log.error('HTTP Error: %s' % exc) self.log.error('HTTP Error: %s' % exc)
self.states[mac_str] = self.ST_IDLE self.states[mac_str] = self.ST_IDLE
return return
except urllib2.URLError as exc: except URLError as exc:
self.log.critical('Internal error: %s' % exc) self.log.critical('Internal error: %s' % exc)
self.states[mac_str] = self.ST_IDLE self.states[mac_str] = self.ST_IDLE
return return
except httplib.HTTPException as exc:
self.log.error('Server error: %s' % type(exc))
self.states[mac_str] = self.ST_IDLE
return
# local access is only validated if mac address is not yet known # local access is only validated if mac address is not yet known
elif mac_str not in self.ippool: elif mac_str not in self.ippool:
item = locals()['%s_str' % self.access] item = locals()['%s_str' % self.access]
@ -516,10 +512,10 @@ class BootpServer:
'.'.join([self.config.get(self.bootp_section, '.'.join([self.config.get(self.bootp_section,
'servername', 'unknown'), 'servername', 'unknown'),
self.config.get(self.bootp_section, self.config.get(self.bootp_section,
'domain', 'localdomain')]) 'domain', 'localdomain')]).encode()
# file # file
buf[BOOTP_FILE] = self.config.get(self.bootp_section, buf[BOOTP_FILE] = self.config.get(self.bootp_section,
'boot_file', '\x00') 'boot_file', '\x00').encode()
if not dhcp_msg_type: if not dhcp_msg_type:
self.log.warn('No DHCP message type found, discarding request') self.log.warn('No DHCP message type found, discarding request')

View File

@ -33,6 +33,8 @@ from urllib.request import urlopen
from . import pybootd_path from . import pybootd_path
from .util import hexline from .util import hexline
#pybootd: disable-msg=broad-except
TFTP_PORT = 69 TFTP_PORT = 69
@ -122,7 +124,7 @@ class TftpConnection(object):
def parse(self, data, unpack=sunpack): def parse(self, data, unpack=sunpack):
self.log.debug('parse') self.log.debug('parse')
buf = buffer(data) buf = data
pkt = {} pkt = {}
opcode = pkt['opcode'] = unpack('!h', buf[:2])[0] opcode = pkt['opcode'] = unpack('!h', buf[:2])[0]
if (opcode == self.RRQ) or (opcode == self.WRQ): if (opcode == self.RRQ) or (opcode == self.WRQ):