mirror of
https://github.com/osm-search/Nominatim.git
synced 2024-11-27 10:43:02 +03:00
bdd: improve assert output for API query checks
Adds wrapper function for checking address parts and more explanation strings to asserts.
This commit is contained in:
parent
ee18a511c6
commit
74122dc965
@ -3,10 +3,35 @@ Collection of assertion functions used for the steps.
|
||||
"""
|
||||
|
||||
class Almost:
|
||||
|
||||
""" Compares a float value with a certain jitter.
|
||||
"""
|
||||
def __init__(self, value, offset=0.00001):
|
||||
self.value = value
|
||||
self.offset = offset
|
||||
|
||||
def __eq__(self, other):
|
||||
return abs(other - self.value) < self.offset
|
||||
|
||||
class Bbox:
|
||||
""" Comparator for bounding boxes.
|
||||
"""
|
||||
def __init__(self, bbox_string):
|
||||
self.coord = [float(x) for x in bbox_string.split(',')]
|
||||
|
||||
def __contains__(self, item):
|
||||
if isinstance(item, str):
|
||||
item = item.split(',')
|
||||
item = list(map(float, item))
|
||||
|
||||
if len(item) == 2:
|
||||
return self.coord[0] <= item[0] <= self.coord[2] \
|
||||
and self.coord[1] <= item[1] <= self.coord[3]
|
||||
|
||||
if len(item) == 4:
|
||||
return item[0] >= self.coord[0] and item[1] <= self.coord[1] \
|
||||
and item[2] >= self.coord[2] and item[3] <= self.coord[3]
|
||||
|
||||
raise ValueError("Not a coordinate or bbox.")
|
||||
|
||||
def __str__(self):
|
||||
return str(self.coord)
|
||||
|
@ -95,6 +95,29 @@ class GenericResponse:
|
||||
assert str(self.result[idx][field]) == str(value), \
|
||||
BadRowValueAssert(self, idx, field, value)
|
||||
|
||||
def assert_address_field(self, idx, field, value):
|
||||
""" Check that result rows`idx` has a field `field` with value `value`
|
||||
in its address. If idx is None, then all results are checked.
|
||||
"""
|
||||
if idx is None:
|
||||
todo = range(len(self.result))
|
||||
else:
|
||||
todo = [int(idx)]
|
||||
|
||||
for idx in todo:
|
||||
assert 'address' in self.result[idx], \
|
||||
"Result row {} has no field 'address'.\nFull row: {}"\
|
||||
.format(idx, json.dumps(self.result[idx], indent=4))
|
||||
|
||||
address = self.result[idx]['address']
|
||||
assert field in address, \
|
||||
"Result row {} has no field '{}' in address.\nFull address: {}"\
|
||||
.format(idx, field, json.dumps(address, indent=4))
|
||||
|
||||
assert address[field] == value, \
|
||||
"\nBad value for row {} field '{}' in address. Expected: {}, got: {}.\nFull address: {}"""\
|
||||
.format(idx, field, value, address[field], json.dumps(address, indent=4))
|
||||
|
||||
def match_row(self, row):
|
||||
""" Match the result fields against the given behave table row.
|
||||
"""
|
||||
|
@ -167,8 +167,11 @@ class NominatimEnvironment:
|
||||
self.test_env['NOMINATIM_WIKIPEDIA_DATA_PATH'] = str(testdata.resolve())
|
||||
|
||||
try:
|
||||
self.run_setup_script('all', 'import-tiger-data',
|
||||
osm_file=self.api_test_file)
|
||||
self.run_setup_script('all', osm_file=self.api_test_file)
|
||||
self.run_setup_script('import-tiger-data')
|
||||
|
||||
phrase_file = str((testdata / 'specialphrases_testdb.sql').resolve())
|
||||
run_script(['psql', '-d', self.api_test_db, '-f', phrase_file])
|
||||
except:
|
||||
self.db_drop_database(self.api_test_db)
|
||||
raise
|
||||
|
@ -11,6 +11,7 @@ from urllib.parse import urlencode
|
||||
|
||||
from utils import run_script
|
||||
from http_responses import GenericResponse, SearchResponse, ReverseResponse, StatusResponse
|
||||
from check_functions import Bbox
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -193,7 +194,7 @@ def validate_result_number(context, operator, number):
|
||||
assert context.response.errorcode == 200
|
||||
numres = len(context.response.result)
|
||||
assert compare(operator, numres, int(number)), \
|
||||
"Bad number of results: expected %s %s, got %d." % (operator, number, numres)
|
||||
"Bad number of results: expected {} {}, got {}.".format(operator, number, numres)
|
||||
|
||||
@then(u'a HTTP (?P<status>\d+) is returned')
|
||||
def check_http_return_status(context, status):
|
||||
@ -261,18 +262,12 @@ def validate_attributes(context, lid, neg, attrs):
|
||||
def step_impl(context):
|
||||
context.execute_steps("then at least 1 result is returned")
|
||||
|
||||
if 'ID' not in context.table.headings:
|
||||
addr_parts = context.response.property_list('address')
|
||||
|
||||
for line in context.table:
|
||||
if 'ID' in context.table.headings:
|
||||
addr_parts = [dict(context.response.result[int(line['ID'])]['address'])]
|
||||
idx = int(line['ID']) if 'ID' in line.headings else None
|
||||
|
||||
for h in context.table.headings:
|
||||
if h != 'ID':
|
||||
for p in addr_parts:
|
||||
assert h in p
|
||||
assert p[h] == line[h], "Bad address value for %s" % h
|
||||
for name, value in zip(line.headings, line.cells):
|
||||
if name != 'ID':
|
||||
context.response.assert_address_field(idx, name, value)
|
||||
|
||||
@then(u'address of result (?P<lid>\d+) has(?P<neg> no)? types (?P<attrs>.*)')
|
||||
def check_address(context, lid, neg, attrs):
|
||||
@ -290,12 +285,11 @@ def check_address(context, lid, neg, attrs):
|
||||
def check_address(context, lid, complete):
|
||||
context.execute_steps("then more than %s results are returned" % lid)
|
||||
|
||||
addr_parts = dict(context.response.result[int(lid)]['address'])
|
||||
lid = int(lid)
|
||||
addr_parts = dict(context.response.result[lid]['address'])
|
||||
|
||||
for line in context.table:
|
||||
assert line['type'] in addr_parts
|
||||
assert addr_parts[line['type']] == line['value'], \
|
||||
"Bad address value for %s" % line['type']
|
||||
context.response.assert_address_field(lid, line['type'], line['value'])
|
||||
del addr_parts[line['type']]
|
||||
|
||||
if complete == 'is':
|
||||
@ -307,41 +301,30 @@ def step_impl(context, lid, coords):
|
||||
context.execute_steps("then at least 1 result is returned")
|
||||
bboxes = context.response.property_list('boundingbox')
|
||||
else:
|
||||
context.execute_steps("then more than %sresults are returned" % lid)
|
||||
bboxes = [ context.response.result[int(lid)]['boundingbox']]
|
||||
context.execute_steps("then more than {}results are returned".format(lid))
|
||||
bboxes = [context.response.result[int(lid)]['boundingbox']]
|
||||
|
||||
coord = [ float(x) for x in coords.split(',') ]
|
||||
expected = Bbox(coords)
|
||||
|
||||
for bbox in bboxes:
|
||||
if isinstance(bbox, str):
|
||||
bbox = bbox.split(',')
|
||||
bbox = [ float(x) for x in bbox ]
|
||||
|
||||
assert bbox[0] >= coord[0]
|
||||
assert bbox[1] <= coord[1]
|
||||
assert bbox[2] >= coord[2]
|
||||
assert bbox[3] <= coord[3]
|
||||
assert bbox in expected, "Bbox {} is not contained in {}.".format(bbox, expected)
|
||||
|
||||
@then(u'result (?P<lid>\d+ )?has centroid in (?P<coords>[\d,.-]+)')
|
||||
def step_impl(context, lid, coords):
|
||||
if lid is None:
|
||||
context.execute_steps("then at least 1 result is returned")
|
||||
bboxes = zip(context.response.property_list('lat'),
|
||||
context.response.property_list('lon'))
|
||||
centroids = zip(context.response.property_list('lon'),
|
||||
context.response.property_list('lat'))
|
||||
else:
|
||||
context.execute_steps("then more than %sresults are returned" % lid)
|
||||
context.execute_steps("then more than %sresults are returned".format(lid))
|
||||
res = context.response.result[int(lid)]
|
||||
bboxes = [ (res['lat'], res['lon']) ]
|
||||
centroids = [(res['lon'], res['lat'])]
|
||||
|
||||
coord = [ float(x) for x in coords.split(',') ]
|
||||
expected = Bbox(coords)
|
||||
|
||||
for lat, lon in bboxes:
|
||||
lat = float(lat)
|
||||
lon = float(lon)
|
||||
assert lat >= coord[0]
|
||||
assert lat <= coord[1]
|
||||
assert lon >= coord[2]
|
||||
assert lon <= coord[3]
|
||||
for centroid in centroids:
|
||||
assert centroid in expected,\
|
||||
"Centroid {} is not inside {}.".format(centroid, expected)
|
||||
|
||||
@then(u'there are(?P<neg> no)? duplicates')
|
||||
def check_for_duplicates(context, neg):
|
||||
|
Loading…
Reference in New Issue
Block a user