bdd: move Response classes in own file and simplify

Removes most of the duplicated parse functions, introduces
a common assert_field function with a more expressive error
message.
This commit is contained in:
Sarah Hoffmann 2021-01-04 22:18:01 +01:00
parent 2712c5f90e
commit 8a93f8ed94
3 changed files with 201 additions and 213 deletions

View File

@ -0,0 +1,194 @@
"""
Classes wrapping HTTP responses from the Nominatim API.
"""
from collections import OrderedDict
import re
import json
import xml.etree.ElementTree as ET
from check_functions import Almost
def _geojson_result_to_json_result(geojson_result):
result = geojson_result['properties']
result['geojson'] = geojson_result['geometry']
if 'bbox' in geojson_result:
# bbox is minlon, minlat, maxlon, maxlat
# boundingbox is minlat, maxlat, minlon, maxlon
result['boundingbox'] = [geojson_result['bbox'][1],
geojson_result['bbox'][3],
geojson_result['bbox'][0],
geojson_result['bbox'][2]]
return result
class BadRowValueAssert:
""" Lazily formatted message for failures to find a field content.
"""
def __init__(self, response, idx, field, value):
self.idx = idx
self.field = field
self.value = value
self.row = response.result[idx]
def __str__(self):
return "\nBad value for row {} field '{}'. Expected: {}, got: {}.\nFull row: {}"""\
.format(self.idx, self.field, self.value,
self.row[self.field], json.dumps(self.row, indent=4))
class GenericResponse:
""" Common base class for all API responses.
"""
def __init__(self, page, fmt, errorcode=200):
self.page = page
self.format = fmt
self.errorcode = errorcode
self.result = []
self.header = dict()
if errorcode == 200:
getattr(self, '_parse_' + fmt)()
def _parse_json(self):
m = re.fullmatch(r'([\w$][^(]*)\((.*)\)', self.page)
if m is None:
code = self.page
else:
code = m.group(2)
self.header['json_func'] = m.group(1)
self.result = json.JSONDecoder(object_pairs_hook=OrderedDict).decode(code)
if isinstance(self.result, OrderedDict):
self.result = [self.result]
def _parse_geojson(self):
self._parse_json()
if 'error' in self.result[0]:
self.result = []
else:
self.result = list(map(_geojson_result_to_json_result, self.result[0]['features']))
def _parse_geocodejson(self):
self._parse_geojson()
if self.result is not None:
self.result = [r['geocoding'] for r in self.result]
def assert_field(self, idx, field, value):
""" Check that result row `idx` has a field `field` with value `value`.
Float numbers are matched approximately. When the expected value
starts with a carat, regular expression matching is used.
"""
assert field in self.result[idx], \
"Result row {} has no field '{}'.\nFull row: {}"\
.format(idx, field, json.dumps(self.result[idx], indent=4))
if isinstance(value, float):
assert Almost(value) == float(self.result[idx][field]), \
BadRowValueAssert(self, idx, field, value)
elif value.startswith("^"):
assert re.fullmatch(value, self.result[idx][field]), \
BadRowValueAssert(self, idx, field, value)
else:
assert str(self.result[idx][field]) == str(value), \
BadRowValueAssert(self, idx, field, value)
def match_row(self, row):
""" Match the result fields against the given behave table row.
"""
if 'ID' in row.headings:
todo = [int(row['ID'])]
else:
todo = range(len(self.result))
for i in todo:
for name, value in zip(row.headings, row.cells):
if name == 'ID':
pass
elif name == 'osm':
self.assert_field(i, 'osm_type', value[0])
self.assert_field(i, 'osm_id', value[1:])
elif name == 'centroid':
lon, lat = value.split(' ')
self.assert_field(i, 'lat', float(lat))
self.assert_field(i, 'lon', float(lon))
else:
self.assert_field(i, name, value)
def property_list(self, prop):
return [x[prop] for x in self.result]
class SearchResponse(GenericResponse):
""" Specialised class for search and lookup responses.
Transforms the xml response in a format similar to json.
"""
def _parse_xml(self):
xml_tree = ET.fromstring(self.page)
self.header = dict(xml_tree.attrib)
for child in xml_tree:
assert child.tag == "place"
self.result.append(dict(child.attrib))
address = {}
for sub in child:
if sub.tag == 'extratags':
self.result[-1]['extratags'] = {}
for tag in sub:
self.result[-1]['extratags'][tag.attrib['key']] = tag.attrib['value']
elif sub.tag == 'namedetails':
self.result[-1]['namedetails'] = {}
for tag in sub:
self.result[-1]['namedetails'][tag.attrib['desc']] = tag.text
elif sub.tag == 'geokml':
self.result[-1][sub.tag] = True
else:
address[sub.tag] = sub.text
if address:
self.result[-1]['address'] = address
class ReverseResponse(GenericResponse):
""" Specialised class for reverse responses.
Transforms the xml response in a format similar to json.
"""
def _parse_xml(self):
xml_tree = ET.fromstring(self.page)
self.header = dict(xml_tree.attrib)
self.result = []
for child in xml_tree:
if child.tag == 'result':
assert not self.result, "More than one result in reverse result"
self.result.append(dict(child.attrib))
elif child.tag == 'addressparts':
address = {}
for sub in child:
address[sub.tag] = sub.text
self.result[0]['address'] = address
elif child.tag == 'extratags':
self.result[0]['extratags'] = {}
for tag in child:
self.result[0]['extratags'][tag.attrib['key']] = tag.attrib['value']
elif child.tag == 'namedetails':
self.result[0]['namedetails'] = {}
for tag in child:
self.result[0]['namedetails'][tag.attrib['desc']] = tag.text
elif child.tag == 'geokml':
self.result[0][child.tag] = True
else:
assert child.tag == 'error', \
"Unknown XML tag {} on page: {}".format(child.tag, self.page)
class StatusResponse(GenericResponse):
""" Specialised class for status responses.
Can also parse text responses.
"""
def _parse_text(self):
pass

View File

@ -1,22 +1,18 @@
""" Steps that run search queries.
""" Steps that run queries against the API.
Queries may either be run directly via PHP using the query script
or via the HTTP interface.
or via the HTTP interface using php-cgi.
"""
import json
import os
import io
import re
import logging
import xml.etree.ElementTree as ET
from urllib.parse import urlencode
from collections import OrderedDict
from check_functions import Almost
from utils import run_script
from http_responses import GenericResponse, SearchResponse, ReverseResponse, StatusResponse
logger = logging.getLogger(__name__)
LOG = logging.getLogger(__name__)
BASE_SERVER_ENV = {
'HTTP_HOST' : 'localhost',
@ -56,208 +52,6 @@ def compare(operator, op1, op2):
else:
raise Exception("unknown operator '%s'" % operator)
class GenericResponse(object):
def match_row(self, row):
if 'ID' in row.headings:
todo = [int(row['ID'])]
else:
todo = range(len(self.result))
for i in todo:
res = self.result[i]
for h in row.headings:
if h == 'ID':
pass
elif h == 'osm':
assert res['osm_type'] == row[h][0]
assert res['osm_id'] == int(row[h][1:])
elif h == 'centroid':
x, y = row[h].split(' ')
assert Almost(float(y)) == float(res['lat'])
assert Almost(float(x)) == float(res['lon'])
elif row[h].startswith("^"):
assert h in res
assert re.fullmatch(row[h], res[h]) is not None, \
"attribute '%s': expected: '%s', got '%s'" % (h, row[h], res[h])
else:
assert h in res
assert str(res[h]) == str(row[h])
def property_list(self, prop):
return [ x[prop] for x in self.result ]
class SearchResponse(GenericResponse):
def __init__(self, page, fmt='json', errorcode=200):
self.page = page
self.format = fmt
self.errorcode = errorcode
self.result = []
self.header = dict()
if errorcode == 200:
getattr(self, 'parse_' + fmt)()
def parse_json(self):
m = re.fullmatch(r'([\w$][^(]*)\((.*)\)', self.page)
if m is None:
code = self.page
else:
code = m.group(2)
self.header['json_func'] = m.group(1)
self.result = json.JSONDecoder(object_pairs_hook=OrderedDict).decode(code)
def parse_geojson(self):
self.parse_json()
self.result = geojson_results_to_json_results(self.result)
def parse_geocodejson(self):
self.parse_geojson()
if self.result is not None:
self.result = [r['geocoding'] for r in self.result]
def parse_xml(self):
et = ET.fromstring(self.page)
self.header = dict(et.attrib)
for child in et:
assert child.tag == "place"
self.result.append(dict(child.attrib))
address = {}
for sub in child:
if sub.tag == 'extratags':
self.result[-1]['extratags'] = {}
for tag in sub:
self.result[-1]['extratags'][tag.attrib['key']] = tag.attrib['value']
elif sub.tag == 'namedetails':
self.result[-1]['namedetails'] = {}
for tag in sub:
self.result[-1]['namedetails'][tag.attrib['desc']] = tag.text
elif sub.tag in ('geokml'):
self.result[-1][sub.tag] = True
else:
address[sub.tag] = sub.text
if len(address) > 0:
self.result[-1]['address'] = address
class ReverseResponse(GenericResponse):
def __init__(self, page, fmt='json', errorcode=200):
self.page = page
self.format = fmt
self.errorcode = errorcode
self.result = []
self.header = dict()
if errorcode == 200:
getattr(self, 'parse_' + fmt)()
def parse_json(self):
m = re.fullmatch(r'([\w$][^(]*)\((.*)\)', self.page)
if m is None:
code = self.page
else:
code = m.group(2)
self.header['json_func'] = m.group(1)
self.result = [json.JSONDecoder(object_pairs_hook=OrderedDict).decode(code)]
def parse_geojson(self):
self.parse_json()
if 'error' in self.result:
return
self.result = geojson_results_to_json_results(self.result[0])
def parse_geocodejson(self):
self.parse_geojson()
if self.result is not None:
self.result = [r['geocoding'] for r in self.result]
def parse_xml(self):
et = ET.fromstring(self.page)
self.header = dict(et.attrib)
self.result = []
for child in et:
if child.tag == 'result':
assert len(self.result) == 0, "More than one result in reverse result"
self.result.append(dict(child.attrib))
elif child.tag == 'addressparts':
address = {}
for sub in child:
address[sub.tag] = sub.text
self.result[0]['address'] = address
elif child.tag == 'extratags':
self.result[0]['extratags'] = {}
for tag in child:
self.result[0]['extratags'][tag.attrib['key']] = tag.attrib['value']
elif child.tag == 'namedetails':
self.result[0]['namedetails'] = {}
for tag in child:
self.result[0]['namedetails'][tag.attrib['desc']] = tag.text
elif child.tag in ('geokml'):
self.result[0][child.tag] = True
else:
assert child.tag == 'error', \
"Unknown XML tag %s on page: %s" % (child.tag, self.page)
class DetailsResponse(GenericResponse):
def __init__(self, page, fmt='json', errorcode=200):
self.page = page
self.format = fmt
self.errorcode = errorcode
self.result = []
self.header = dict()
if errorcode == 200:
getattr(self, 'parse_' + fmt)()
def parse_json(self):
self.result = [json.JSONDecoder(object_pairs_hook=OrderedDict).decode(self.page)]
class StatusResponse(GenericResponse):
def __init__(self, page, fmt='text', errorcode=200):
self.page = page
self.format = fmt
self.errorcode = errorcode
if errorcode == 200 and fmt != 'text':
getattr(self, 'parse_' + fmt)()
def parse_json(self):
self.result = [json.JSONDecoder(object_pairs_hook=OrderedDict).decode(self.page)]
def geojson_result_to_json_result(geojson_result):
result = geojson_result['properties']
result['geojson'] = geojson_result['geometry']
if 'bbox' in geojson_result:
# bbox is minlon, minlat, maxlon, maxlat
# boundingbox is minlat, maxlat, minlon, maxlon
result['boundingbox'] = [
geojson_result['bbox'][1],
geojson_result['bbox'][3],
geojson_result['bbox'][0],
geojson_result['bbox'][2]
]
return result
def geojson_results_to_json_results(geojson_results):
if 'error' in geojson_results:
return
return list(map(geojson_result_to_json_result, geojson_results['features']))
@when(u'searching for "(?P<query>.*)"(?P<dups> with dups)?')
def query_cmd(context, query, dups):
@ -301,7 +95,7 @@ def send_api_query(endpoint, params, fmt, context):
env['SCRIPT_FILENAME'] = os.path.join(env['CONTEXT_DOCUMENT_ROOT'],
'%s.php' % endpoint)
logger.debug("Environment:" + json.dumps(env, sort_keys=True, indent=2))
LOG.debug("Environment:" + json.dumps(env, sort_keys=True, indent=2))
if hasattr(context, 'http_headers'):
env.update(context.http_headers)
@ -395,7 +189,7 @@ def website_details_request(context, fmt, query):
else:
outfmt = fmt.strip()
context.response = DetailsResponse(outp, outfmt, status)
context.response = GenericResponse(outp, outfmt, status)
@when(u'sending (?P<fmt>\S+ )?lookup query for (?P<query>.*)')
def website_lookup_request(context, fmt, query):

View File

@ -35,7 +35,7 @@ def define_node_grid(context, grid_step):
grid_step = 0.00001
context.osm.set_grid([context.table.headings] + [list(h) for h in context.table],
grid_step)
grid_step)
@when(u'loading osm data')