mirror of
https://github.com/osm-search/Nominatim.git
synced 2025-01-08 14:38:32 +03:00
340e7f7210
Also removes some functions that are no longer used and fixes debug output where the tests found an issue.
222 lines
8.0 KiB
Python
222 lines
8.0 KiB
Python
"""
|
|
Classes wrapping HTTP responses from the Nominatim API.
|
|
"""
|
|
from collections import OrderedDict
|
|
import re
|
|
import json
|
|
import xml.etree.ElementTree as ET
|
|
|
|
from check_functions import Almost
|
|
|
|
def _geojson_result_to_json_result(geojson_result):
|
|
result = geojson_result['properties']
|
|
result['geojson'] = geojson_result['geometry']
|
|
if 'bbox' in geojson_result:
|
|
# bbox is minlon, minlat, maxlon, maxlat
|
|
# boundingbox is minlat, maxlat, minlon, maxlon
|
|
result['boundingbox'] = [geojson_result['bbox'][1],
|
|
geojson_result['bbox'][3],
|
|
geojson_result['bbox'][0],
|
|
geojson_result['bbox'][2]]
|
|
return result
|
|
|
|
class BadRowValueAssert:
|
|
""" Lazily formatted message for failures to find a field content.
|
|
"""
|
|
|
|
def __init__(self, response, idx, field, value):
|
|
self.idx = idx
|
|
self.field = field
|
|
self.value = value
|
|
self.row = response.result[idx]
|
|
|
|
def __str__(self):
|
|
return "\nBad value for row {} field '{}'. Expected: {}, got: {}.\nFull row: {}"""\
|
|
.format(self.idx, self.field, self.value,
|
|
self.row[self.field], json.dumps(self.row, indent=4))
|
|
|
|
|
|
class GenericResponse:
|
|
""" Common base class for all API responses.
|
|
"""
|
|
def __init__(self, page, fmt, errorcode=200):
|
|
fmt = fmt.strip()
|
|
if fmt == 'jsonv2':
|
|
fmt = 'json'
|
|
|
|
self.page = page
|
|
self.format = fmt
|
|
self.errorcode = errorcode
|
|
self.result = []
|
|
self.header = dict()
|
|
|
|
if errorcode == 200 and fmt != 'debug':
|
|
getattr(self, '_parse_' + fmt)()
|
|
|
|
def _parse_json(self):
|
|
m = re.fullmatch(r'([\w$][^(]*)\((.*)\)', self.page)
|
|
if m is None:
|
|
code = self.page
|
|
else:
|
|
code = m.group(2)
|
|
self.header['json_func'] = m.group(1)
|
|
self.result = json.JSONDecoder(object_pairs_hook=OrderedDict).decode(code)
|
|
if isinstance(self.result, OrderedDict):
|
|
self.result = [self.result]
|
|
|
|
def _parse_geojson(self):
|
|
self._parse_json()
|
|
if 'error' in self.result[0]:
|
|
self.result = []
|
|
else:
|
|
self.result = list(map(_geojson_result_to_json_result, self.result[0]['features']))
|
|
|
|
def _parse_geocodejson(self):
|
|
self._parse_geojson()
|
|
if self.result is not None:
|
|
self.result = [r['geocoding'] for r in self.result]
|
|
|
|
def assert_field(self, idx, field, value):
|
|
""" Check that result row `idx` has a field `field` with value `value`.
|
|
Float numbers are matched approximately. When the expected value
|
|
starts with a carat, regular expression matching is used.
|
|
"""
|
|
assert field in self.result[idx], \
|
|
"Result row {} has no field '{}'.\nFull row: {}"\
|
|
.format(idx, field, json.dumps(self.result[idx], indent=4))
|
|
|
|
if isinstance(value, float):
|
|
assert Almost(value) == float(self.result[idx][field]), \
|
|
BadRowValueAssert(self, idx, field, value)
|
|
elif value.startswith("^"):
|
|
assert re.fullmatch(value, self.result[idx][field]), \
|
|
BadRowValueAssert(self, idx, field, value)
|
|
else:
|
|
assert str(self.result[idx][field]) == str(value), \
|
|
BadRowValueAssert(self, idx, field, value)
|
|
|
|
def assert_address_field(self, idx, field, value):
|
|
""" Check that result rows`idx` has a field `field` with value `value`
|
|
in its address. If idx is None, then all results are checked.
|
|
"""
|
|
if idx is None:
|
|
todo = range(len(self.result))
|
|
else:
|
|
todo = [int(idx)]
|
|
|
|
for idx in todo:
|
|
assert 'address' in self.result[idx], \
|
|
"Result row {} has no field 'address'.\nFull row: {}"\
|
|
.format(idx, json.dumps(self.result[idx], indent=4))
|
|
|
|
address = self.result[idx]['address']
|
|
assert field in address, \
|
|
"Result row {} has no field '{}' in address.\nFull address: {}"\
|
|
.format(idx, field, json.dumps(address, indent=4))
|
|
|
|
assert address[field] == value, \
|
|
"\nBad value for row {} field '{}' in address. Expected: {}, got: {}.\nFull address: {}"""\
|
|
.format(idx, field, value, address[field], json.dumps(address, indent=4))
|
|
|
|
def match_row(self, row):
|
|
""" Match the result fields against the given behave table row.
|
|
"""
|
|
if 'ID' in row.headings:
|
|
todo = [int(row['ID'])]
|
|
else:
|
|
todo = range(len(self.result))
|
|
|
|
for i in todo:
|
|
for name, value in zip(row.headings, row.cells):
|
|
if name == 'ID':
|
|
pass
|
|
elif name == 'osm':
|
|
self.assert_field(i, 'osm_type', value[0])
|
|
self.assert_field(i, 'osm_id', value[1:])
|
|
elif name == 'centroid':
|
|
lon, lat = value.split(' ')
|
|
self.assert_field(i, 'lat', float(lat))
|
|
self.assert_field(i, 'lon', float(lon))
|
|
else:
|
|
self.assert_field(i, name, value)
|
|
|
|
def property_list(self, prop):
|
|
return [x[prop] for x in self.result]
|
|
|
|
|
|
class SearchResponse(GenericResponse):
|
|
""" Specialised class for search and lookup responses.
|
|
Transforms the xml response in a format similar to json.
|
|
"""
|
|
|
|
def _parse_xml(self):
|
|
xml_tree = ET.fromstring(self.page)
|
|
|
|
self.header = dict(xml_tree.attrib)
|
|
|
|
for child in xml_tree:
|
|
assert child.tag == "place"
|
|
self.result.append(dict(child.attrib))
|
|
|
|
address = {}
|
|
for sub in child:
|
|
if sub.tag == 'extratags':
|
|
self.result[-1]['extratags'] = {}
|
|
for tag in sub:
|
|
self.result[-1]['extratags'][tag.attrib['key']] = tag.attrib['value']
|
|
elif sub.tag == 'namedetails':
|
|
self.result[-1]['namedetails'] = {}
|
|
for tag in sub:
|
|
self.result[-1]['namedetails'][tag.attrib['desc']] = tag.text
|
|
elif sub.tag == 'geokml':
|
|
self.result[-1][sub.tag] = True
|
|
else:
|
|
address[sub.tag] = sub.text
|
|
|
|
if address:
|
|
self.result[-1]['address'] = address
|
|
|
|
|
|
class ReverseResponse(GenericResponse):
|
|
""" Specialised class for reverse responses.
|
|
Transforms the xml response in a format similar to json.
|
|
"""
|
|
|
|
def _parse_xml(self):
|
|
xml_tree = ET.fromstring(self.page)
|
|
|
|
self.header = dict(xml_tree.attrib)
|
|
self.result = []
|
|
|
|
for child in xml_tree:
|
|
if child.tag == 'result':
|
|
assert not self.result, "More than one result in reverse result"
|
|
self.result.append(dict(child.attrib))
|
|
elif child.tag == 'addressparts':
|
|
address = {}
|
|
for sub in child:
|
|
address[sub.tag] = sub.text
|
|
self.result[0]['address'] = address
|
|
elif child.tag == 'extratags':
|
|
self.result[0]['extratags'] = {}
|
|
for tag in child:
|
|
self.result[0]['extratags'][tag.attrib['key']] = tag.attrib['value']
|
|
elif child.tag == 'namedetails':
|
|
self.result[0]['namedetails'] = {}
|
|
for tag in child:
|
|
self.result[0]['namedetails'][tag.attrib['desc']] = tag.text
|
|
elif child.tag == 'geokml':
|
|
self.result[0][child.tag] = True
|
|
else:
|
|
assert child.tag == 'error', \
|
|
"Unknown XML tag {} on page: {}".format(child.tag, self.page)
|
|
|
|
|
|
class StatusResponse(GenericResponse):
|
|
""" Specialised class for status responses.
|
|
Can also parse text responses.
|
|
"""
|
|
|
|
def _parse_text(self):
|
|
pass
|