Nominatim/tests/steps/api_result.py
2016-01-20 01:14:20 +01:00

276 lines
11 KiB
Python

""" Steps for checking the results of queries.
"""
from nose.tools import *
from lettuce import *
from tidylib import tidy_document
from collections import OrderedDict
import json
import logging
import re
from xml.dom.minidom import parseString
logger = logging.getLogger(__name__)
def _parse_xml():
""" Puts the DOM structure into more convenient python
with a similar structure as the json document, so
that the same the semantics can be used. It does not
check if the content is valid (or at least not more than
necessary to transform it into a dict structure).
"""
page = parseString(world.page).documentElement
# header info
world.result_header = OrderedDict(page.attributes.items())
logger.debug('Result header: %r' % (world.result_header))
world.results = []
# results
if page.nodeName == 'searchresults' or page.nodeName == 'lookupresults':
for node in page.childNodes:
if node.nodeName != "#text":
assert_equals(node.nodeName, 'place', msg="Unexpected element '%s'" % node.nodeName)
newresult = OrderedDict(node.attributes.items())
assert_not_in('address', newresult)
assert_not_in('geokml', newresult)
assert_not_in('extratags', newresult)
assert_not_in('namedetails', newresult)
address = OrderedDict()
for sub in node.childNodes:
if sub.nodeName == 'geokml':
newresult['geokml'] = sub.childNodes[0].toxml()
elif sub.nodeName == 'extratags':
newresult['extratags'] = {}
for tag in sub.childNodes:
assert_equals(tag.nodeName, 'tag')
attrs = dict(tag.attributes.items())
assert_in('key', attrs)
assert_in('value', attrs)
newresult['extratags'][attrs['key']] = attrs['value']
elif sub.nodeName == 'namedetails':
newresult['namedetails'] = {}
for tag in sub.childNodes:
assert_equals(tag.nodeName, 'name')
attrs = dict(tag.attributes.items())
assert_in('desc', attrs)
newresult['namedetails'][attrs['desc']] = tag.firstChild.nodeValue.strip()
elif sub.nodeName == '#text':
pass
else:
address[sub.nodeName] = sub.firstChild.nodeValue.strip()
if address:
newresult['address'] = address
world.results.append(newresult)
elif page.nodeName == 'reversegeocode':
haserror = False
address = {}
for node in page.childNodes:
if node.nodeName == 'result':
assert_equals(len(world.results), 0)
assert (not haserror)
world.results.append(OrderedDict(node.attributes.items()))
assert_not_in('display_name', world.results[0])
assert_not_in('address', world.results[0])
world.results[0]['display_name'] = node.firstChild.nodeValue.strip()
elif node.nodeName == 'error':
assert_equals(len(world.results), 0)
haserror = True
elif node.nodeName == 'addressparts':
assert (not haserror)
address = OrderedDict()
for sub in node.childNodes:
address[sub.nodeName] = sub.firstChild.nodeValue.strip()
world.results[0]['address'] = address
elif node.nodeName == 'extratags':
world.results[0]['extratags'] = {}
for tag in node.childNodes:
assert_equals(tag.nodeName, 'tag')
attrs = dict(tag.attributes.items())
assert_in('key', attrs)
assert_in('value', attrs)
world.results[0]['extratags'][attrs['key']] = attrs['value']
elif node.nodeName == 'namedetails':
world.results[0]['namedetails'] = {}
for tag in node.childNodes:
assert_equals(tag.nodeName, 'name')
attrs = dict(tag.attributes.items())
assert_in('desc', attrs)
world.results[0]['namedetails'][attrs['desc']] = tag.firstChild.nodeValue.strip()
elif node.nodeName == "#text":
pass
else:
assert False, "Unknown content '%s' in XML" % node.nodeName
else:
assert False, "Unknown document node name %s in XML" % page.nodeName
logger.debug("The following was parsed out of XML:")
logger.debug(world.results)
@step(u'a HTTP (\d+) is returned')
def api_result_http_error(step, error):
assert_equals(world.returncode, int(error))
@step(u'the result is valid( \w+)?')
def api_result_is_valid(step, fmt):
assert_equals(world.returncode, 200)
if world.response_format == 'html':
document, errors = tidy_document(world.page,
options={'char-encoding' : 'utf8'})
# assert(len(errors) == 0), "Errors found in HTML document:\n%s" % errors
world.results = document
elif world.response_format == 'xml':
_parse_xml()
elif world.response_format == 'json':
world.results = json.JSONDecoder(object_pairs_hook=OrderedDict).decode(world.page)
if world.request_type == 'reverse':
world.results = (world.results,)
else:
assert False, "Unknown page format: %s" % (world.response_format)
if fmt:
assert_equals (fmt.strip(), world.response_format)
def compare(operator, op1, op2):
if operator == 'less than':
return op1 < op2
elif operator == 'more than':
return op1 > op2
elif operator == 'exactly':
return op1 == op2
elif operator == 'at least':
return op1 >= op2
elif operator == 'at most':
return op1 <= op2
else:
raise Exception("unknown operator '%s'" % operator)
@step(u'(less than|more than|exactly|at least|at most) (\d+) results? (?:is|are) returned')
def validate_result_number(step, operator, number):
step.given('the result is valid')
numres = len(world.results)
assert compare(operator, numres, int(number)), \
"Bad number of results: expected %s %s, got %d." % (operator, number, numres)
@step(u'result (\d+) has( not)? attributes (\S+)')
def search_check_for_result_attribute(step, num, invalid, attrs):
num = int(num)
step.given('at least %d results are returned' % (num + 1))
res = world.results[num]
for attr in attrs.split(','):
if invalid:
assert_not_in(attr.strip(), res)
else:
assert_in(attr.strip(),res)
@step(u'there is a json wrapper "([^"]*)"')
def api_result_check_json_wrapper(step, wrapper):
step.given('the result is valid json')
assert_equals(world.json_callback, wrapper)
@step(u'result header contains')
def api_result_header_contains(step):
step.given('the result is valid')
for line in step.hashes:
assert_in(line['attr'], world.result_header)
m = re.match("%s$" % (line['value'],), world.result_header[line['attr']])
@step(u'result header has no attribute (.*)')
def api_result_header_contains_not(step, attr):
step.given('the result is valid')
assert_not_in(attr, world.result_header)
@step(u'results contain$')
def api_result_contains(step):
step.given('at least 1 result is returned')
for line in step.hashes:
if 'ID' in line:
reslist = (world.results[int(line['ID'])],)
else:
reslist = world.results
for k,v in line.iteritems():
if k == 'latlon':
for curres in reslist:
world.match_geometry((float(curres['lat']), float(curres['lon'])), v)
elif k != 'ID':
for curres in reslist:
assert_in(k, curres)
if v[0] in '<>=':
# mathematical operation
evalexp = '%s %s' % (curres[k], v)
res = eval(evalexp)
logger.debug('Evaluating: %s = %s' % (res, evalexp))
assert_true(res, "Evaluation failed: %s" % (evalexp, ))
else:
# regex match
m = re.match("%s$" % (v,), curres[k])
assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (k, v, curres[k]))
@step(u'result addresses contain$')
def api_result_address_contains(step):
step.given('the result is valid')
for line in step.hashes:
if 'ID' in line:
reslist = (world.results[int(line['ID'])],)
else:
reslist = world.results
for k,v in line.iteritems():
if k != 'ID':
for res in reslist:
curres = res['address']
assert_in(k, curres)
m = re.match("%s$" % (v,), curres[k])
assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (k, v, curres[k]))
@step(u'address of result (\d+) contains')
def api_result_address_exact(step, resid):
resid = int(resid)
step.given('at least %d results are returned' % (resid + 1))
addr = world.results[resid]['address']
for line in step.hashes:
assert_in(line['type'], addr)
m = re.match("%s$" % line['value'], addr[line['type']])
assert_is_not_none(m, msg="field %s does not match: %s$ != %s." % (
line['type'], line['value'], addr[line['type']]))
#assert_equals(line['value'], addr[line['type']])
@step(u'address of result (\d+) does not contain (.*)')
def api_result_address_details_missing(step, resid, types):
resid = int(resid)
step.given('at least %d results are returned' % (resid + 1))
addr = world.results[resid]['address']
for t in types.split(','):
assert_not_in(t.strip(), addr)
@step(u'address of result (\d+) is')
def api_result_address_exact(step, resid):
resid = int(resid)
step.given('at least %d results are returned' % (resid + 1))
result = world.results[resid]
linenr = 0
assert_equals(len(step.hashes), len(result['address']))
for k,v in result['address'].iteritems():
assert_equals(step.hashes[linenr]['type'], k)
assert_equals(step.hashes[linenr]['value'], v)
linenr += 1
@step('there are( no)? duplicates')
def api_result_check_for_duplicates(step, nodups=None):
step.given('at least 1 result is returned')
resarr = []
for res in world.results:
resarr.append((res['osm_type'], res['class'],
res['type'], res['display_name']))
if nodups is None:
assert len(resarr) > len(set(resarr))
else:
assert_equal(len(resarr), len(set(resarr)))