diff --git a/src/nominatim_api/v1/server_glue.py b/src/nominatim_api/v1/server_glue.py
index c00b580b..0e954901 100644
--- a/src/nominatim_api/v1/server_glue.py
+++ b/src/nominatim_api/v1/server_glue.py
@@ -86,44 +86,6 @@ class ASGIAdaptor(abc.ABC):
"""
- def build_response(self, output: str, status: int = 200, num_results: int = 0) -> Any:
- """ Create a response from the given output. Wraps a JSONP function
- around the response, if necessary.
- """
- if self.content_type == CONTENT_JSON and status == 200:
- jsonp = self.get('json_callback')
- if jsonp is not None:
- if any(not part.isidentifier() for part in jsonp.split('.')):
- self.raise_error('Invalid json_callback value')
- output = f"{jsonp}({output})"
- self.content_type = 'application/javascript; charset=utf-8'
-
- return self.create_response(status, output, num_results)
-
-
- def raise_error(self, msg: str, status: int = 400) -> NoReturn:
- """ Raise an exception resulting in the given HTTP status and
- message. The message will be formatted according to the
- output format chosen by the request.
- """
- if self.content_type == CONTENT_XML:
- msg = f"""
-
- {status}
- {msg}
-
- """
- elif self.content_type == CONTENT_JSON:
- msg = f"""{{"error":{{"code":{status},"message":"{msg}"}}}}"""
- elif self.content_type == CONTENT_HTML:
- loglib.log().section('Execution error')
- loglib.log().var_dump('Status', status)
- loglib.log().var_dump('Message', msg)
- msg = loglib.get_and_disable()
-
- raise self.error(msg, status)
-
-
def get_int(self, name: str, default: Optional[int] = None) -> int:
""" Return an input parameter as an int. Raises an exception if
the parameter is given but not in an integer format.
@@ -194,81 +156,120 @@ class ASGIAdaptor(abc.ABC):
return value != '0'
- def get_accepted_languages(self) -> str:
- """ Return the accepted languages.
+ def raise_error(self, msg: str, status: int = 400) -> NoReturn:
+ """ Raise an exception resulting in the given HTTP status and
+ message. The message will be formatted according to the
+ output format chosen by the request.
"""
- return self.get('accept-language')\
- or self.get_header('accept-language')\
- or self.config().DEFAULT_LANGUAGE
+ if self.content_type == CONTENT_XML:
+ msg = f"""
+
+ {status}
+ {msg}
+
+ """
+ elif self.content_type == CONTENT_JSON:
+ msg = f"""{{"error":{{"code":{status},"message":"{msg}"}}}}"""
+ elif self.content_type == CONTENT_HTML:
+ loglib.log().section('Execution error')
+ loglib.log().var_dump('Status', status)
+ loglib.log().var_dump('Message', msg)
+ msg = loglib.get_and_disable()
+
+ raise self.error(msg, status)
- def setup_debugging(self) -> bool:
- """ Set up collection of debug information if requested.
+def build_response(adaptor: ASGIAdaptor, output: str, status: int = 200,
+ num_results: int = 0) -> Any:
+ """ Create a response from the given output. Wraps a JSONP function
+ around the response, if necessary.
+ """
+ if adaptor.content_type == CONTENT_JSON and status == 200:
+ jsonp = adaptor.get('json_callback')
+ if jsonp is not None:
+ if any(not part.isidentifier() for part in jsonp.split('.')):
+ adaptor.raise_error('Invalid json_callback value')
+ output = f"{jsonp}({output})"
+ adaptor.content_type = 'application/javascript; charset=utf-8'
- Return True when debugging was requested.
- """
- if self.get_bool('debug', False):
- loglib.set_log_output('html')
- self.content_type = CONTENT_HTML
- return True
-
- return False
+ return adaptor.create_response(status, output, num_results)
- def get_layers(self) -> Optional[DataLayer]:
- """ Return a parsed version of the layer parameter.
- """
- param = self.get('layer', None)
- if param is None:
- return None
-
- return cast(DataLayer,
- reduce(DataLayer.__or__,
- (getattr(DataLayer, s.upper()) for s in param.split(','))))
+def get_accepted_languages(adaptor: ASGIAdaptor) -> str:
+ """ Return the accepted languages.
+ """
+ return adaptor.get('accept-language')\
+ or adaptor.get_header('accept-language')\
+ or adaptor.config().DEFAULT_LANGUAGE
- def parse_format(self, result_type: Type[Any], default: str) -> str:
- """ Get and check the 'format' parameter and prepare the formatter.
- `result_type` is the type of result to be returned by the function
- and `default` the format value to assume when no parameter is present.
- """
- fmt = self.get('format', default=default)
- assert fmt is not None
+def setup_debugging(adaptor: ASGIAdaptor) -> bool:
+ """ Set up collection of debug information if requested.
- if not formatting.supports_format(result_type, fmt):
- self.raise_error("Parameter 'format' must be one of: " +
- ', '.join(formatting.list_formats(result_type)))
+ Return True when debugging was requested.
+ """
+ if adaptor.get_bool('debug', False):
+ loglib.set_log_output('html')
+ adaptor.content_type = CONTENT_HTML
+ return True
- self.content_type = CONTENT_TYPE.get(fmt, CONTENT_JSON)
- return fmt
+ return False
- def parse_geometry_details(self, fmt: str) -> Dict[str, Any]:
- """ Create details structure from the supplied geometry parameters.
- """
- numgeoms = 0
- output = GeometryFormat.NONE
- if self.get_bool('polygon_geojson', False):
- output |= GeometryFormat.GEOJSON
+def get_layers(adaptor: ASGIAdaptor) -> Optional[DataLayer]:
+ """ Return a parsed version of the layer parameter.
+ """
+ param = adaptor.get('layer', None)
+ if param is None:
+ return None
+
+ return cast(DataLayer,
+ reduce(DataLayer.__or__,
+ (getattr(DataLayer, s.upper()) for s in param.split(','))))
+
+
+def parse_format(adaptor: ASGIAdaptor, result_type: Type[Any], default: str) -> str:
+ """ Get and check the 'format' parameter and prepare the formatter.
+ `result_type` is the type of result to be returned by the function
+ and `default` the format value to assume when no parameter is present.
+ """
+ fmt = adaptor.get('format', default=default)
+ assert fmt is not None
+
+ if not formatting.supports_format(result_type, fmt):
+ adaptor.raise_error("Parameter 'format' must be one of: " +
+ ', '.join(formatting.list_formats(result_type)))
+
+ adaptor.content_type = CONTENT_TYPE.get(fmt, CONTENT_JSON)
+ return fmt
+
+
+def parse_geometry_details(adaptor: ASGIAdaptor, fmt: str) -> Dict[str, Any]:
+ """ Create details structure from the supplied geometry parameters.
+ """
+ numgeoms = 0
+ output = GeometryFormat.NONE
+ if adaptor.get_bool('polygon_geojson', False):
+ output |= GeometryFormat.GEOJSON
+ numgeoms += 1
+ if fmt not in ('geojson', 'geocodejson'):
+ if adaptor.get_bool('polygon_text', False):
+ output |= GeometryFormat.TEXT
+ numgeoms += 1
+ if adaptor.get_bool('polygon_kml', False):
+ output |= GeometryFormat.KML
+ numgeoms += 1
+ if adaptor.get_bool('polygon_svg', False):
+ output |= GeometryFormat.SVG
numgeoms += 1
- if fmt not in ('geojson', 'geocodejson'):
- if self.get_bool('polygon_text', False):
- output |= GeometryFormat.TEXT
- numgeoms += 1
- if self.get_bool('polygon_kml', False):
- output |= GeometryFormat.KML
- numgeoms += 1
- if self.get_bool('polygon_svg', False):
- output |= GeometryFormat.SVG
- numgeoms += 1
- if numgeoms > self.config().get_int('POLYGON_OUTPUT_MAX_TYPES'):
- self.raise_error('Too many polygon output options selected.')
+ if numgeoms > adaptor.config().get_int('POLYGON_OUTPUT_MAX_TYPES'):
+ adaptor.raise_error('Too many polygon output options selected.')
- return {'address_details': True,
- 'geometry_simplification': self.get_float('polygon_threshold', 0.0),
- 'geometry_output': output
- }
+ return {'address_details': True,
+ 'geometry_simplification': adaptor.get_float('polygon_threshold', 0.0),
+ 'geometry_output': output
+ }
async def status_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
@@ -276,21 +277,21 @@ async def status_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
"""
result = await api.status()
- fmt = params.parse_format(StatusResult, 'text')
+ fmt = parse_format(params, StatusResult, 'text')
if fmt == 'text' and result.status:
status_code = 500
else:
status_code = 200
- return params.build_response(formatting.format_result(result, fmt, {}),
+ return build_response(params, formatting.format_result(result, fmt, {}),
status=status_code)
async def details_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
""" Server glue for /details endpoint. See API docs for details.
"""
- fmt = params.parse_format(DetailedResult, 'json')
+ fmt = parse_format(params, DetailedResult, 'json')
place_id = params.get_int('place_id', 0)
place: PlaceRef
if place_id:
@@ -301,9 +302,9 @@ async def details_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
params.raise_error("Missing ID parameter 'place_id' or 'osmtype'.")
place = OsmID(osmtype, params.get_int('osmid'), params.get('class'))
- debug = params.setup_debugging()
+ debug = setup_debugging(params)
- locales = Locales.from_accept_languages(params.get_accepted_languages())
+ locales = Locales.from_accept_languages(get_accepted_languages(params))
result = await api.details(place,
address_details=params.get_bool('addressdetails', False),
@@ -317,7 +318,7 @@ async def details_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
)
if debug:
- return params.build_response(loglib.get_and_disable())
+ return build_response(params, loglib.get_and_disable())
if result is None:
params.raise_error('No place with that OSM ID found.', status=404)
@@ -327,25 +328,25 @@ async def details_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
'group_hierarchy': params.get_bool('group_hierarchy', False),
'icon_base_url': params.config().MAPICON_URL})
- return params.build_response(output, num_results=1)
+ return build_response(params, output, num_results=1)
async def reverse_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
""" Server glue for /reverse endpoint. See API docs for details.
"""
- fmt = params.parse_format(ReverseResults, 'xml')
- debug = params.setup_debugging()
+ fmt = parse_format(params, ReverseResults, 'xml')
+ debug = setup_debugging(params)
coord = Point(params.get_float('lon'), params.get_float('lat'))
- details = params.parse_geometry_details(fmt)
+ details = parse_geometry_details(params, fmt)
details['max_rank'] = helpers.zoom_to_rank(params.get_int('zoom', 18))
- details['layers'] = params.get_layers()
- details['locales'] = Locales.from_accept_languages(params.get_accepted_languages())
+ details['layers'] = get_layers(params)
+ details['locales'] = Locales.from_accept_languages(get_accepted_languages(params))
result = await api.reverse(coord, **details)
if debug:
- return params.build_response(loglib.get_and_disable(), num_results=1 if result else 0)
+ return build_response(params, loglib.get_and_disable(), num_results=1 if result else 0)
if fmt == 'xml':
queryparts = {'lat': str(coord.lat), 'lon': str(coord.lon), 'format': 'xml'}
@@ -364,16 +365,16 @@ async def reverse_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
output = formatting.format_result(ReverseResults([result] if result else []),
fmt, fmt_options)
- return params.build_response(output, num_results=1 if result else 0)
+ return build_response(params, output, num_results=1 if result else 0)
async def lookup_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
""" Server glue for /lookup endpoint. See API docs for details.
"""
- fmt = params.parse_format(SearchResults, 'xml')
- debug = params.setup_debugging()
- details = params.parse_geometry_details(fmt)
- details['locales'] = Locales.from_accept_languages(params.get_accepted_languages())
+ fmt = parse_format(params, SearchResults, 'xml')
+ debug = setup_debugging(params)
+ details = parse_geometry_details(params, fmt)
+ details['locales'] = Locales.from_accept_languages(get_accepted_languages(params))
places = []
for oid in (params.get('osm_ids') or '').split(','):
@@ -390,7 +391,7 @@ async def lookup_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
results = SearchResults()
if debug:
- return params.build_response(loglib.get_and_disable(), num_results=len(results))
+ return build_response(params, loglib.get_and_disable(), num_results=len(results))
fmt_options = {'extratags': params.get_bool('extratags', False),
'namedetails': params.get_bool('namedetails', False),
@@ -398,7 +399,7 @@ async def lookup_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
output = formatting.format_result(results, fmt, fmt_options)
- return params.build_response(output, num_results=len(results))
+ return build_response(params, output, num_results=len(results))
async def _unstructured_search(query: str, api: NominatimAPIAsync,
@@ -435,9 +436,9 @@ async def _unstructured_search(query: str, api: NominatimAPIAsync,
async def search_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
""" Server glue for /search endpoint. See API docs for details.
"""
- fmt = params.parse_format(SearchResults, 'jsonv2')
- debug = params.setup_debugging()
- details = params.parse_geometry_details(fmt)
+ fmt = parse_format(params, SearchResults, 'jsonv2')
+ debug = setup_debugging(params)
+ details = parse_geometry_details(params, fmt)
details['countries'] = params.get('countrycodes', None)
details['excluded'] = params.get('exclude_place_ids', None)
@@ -454,9 +455,9 @@ async def search_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
if params.get('featureType', None) is not None:
details['layers'] = DataLayer.ADDRESS
else:
- details['layers'] = params.get_layers()
+ details['layers'] = get_layers(params)
- details['locales'] = Locales.from_accept_languages(params.get_accepted_languages())
+ details['locales'] = Locales.from_accept_languages(get_accepted_languages(params))
# unstructured query parameters
query = params.get('q', None)
@@ -486,7 +487,7 @@ async def search_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
results = helpers.deduplicate_results(results, max_results)
if debug:
- return params.build_response(loglib.get_and_disable(), num_results=len(results))
+ return build_response(params, loglib.get_and_disable(), num_results=len(results))
if fmt == 'xml':
helpers.extend_query_parts(queryparts, details,
@@ -509,7 +510,7 @@ async def search_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
output = formatting.format_result(results, fmt, fmt_options)
- return params.build_response(output, num_results=len(results))
+ return build_response(params, output, num_results=len(results))
async def deletable_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
@@ -518,7 +519,7 @@ async def deletable_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any
deleted or are broken in the OSM data but are kept in the
Nominatim database to minimize disruption.
"""
- fmt = params.parse_format(RawDataList, 'json')
+ fmt = parse_format(params, RawDataList, 'json')
async with api.begin() as conn:
sql = sa.text(""" SELECT p.place_id, country_code,
@@ -529,7 +530,7 @@ async def deletable_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any
""")
results = RawDataList(r._asdict() for r in await conn.execute(sql))
- return params.build_response(formatting.format_result(results, fmt, {}))
+ return build_response(params, formatting.format_result(results, fmt, {}))
async def polygons_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
@@ -538,7 +539,7 @@ async def polygons_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
their size but are kept in the Nominatim database with their
old area to minimize disruption.
"""
- fmt = params.parse_format(RawDataList, 'json')
+ fmt = parse_format(params, RawDataList, 'json')
sql_params: Dict[str, Any] = {
'days': params.get_int('days', -1),
'cls': params.get('class')
@@ -561,7 +562,7 @@ async def polygons_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
results = RawDataList(r._asdict() for r in await conn.execute(sql, sql_params))
- return params.build_response(formatting.format_result(results, fmt, {}))
+ return build_response(params, formatting.format_result(results, fmt, {}))
EndpointFunc = Callable[[NominatimAPIAsync, ASGIAdaptor], Any]
diff --git a/test/python/api/test_server_glue_v1.py b/test/python/api/test_server_glue_v1.py
index 5716f245..80cd51a3 100644
--- a/test/python/api/test_server_glue_v1.py
+++ b/test/python/api/test_server_glue_v1.py
@@ -59,14 +59,14 @@ def test_adaptor_get_bool_falsish():
def test_adaptor_parse_format_use_default():
adaptor = FakeAdaptor()
- assert adaptor.parse_format(napi.StatusResult, 'text') == 'text'
+ assert glue.parse_format(adaptor, napi.StatusResult, 'text') == 'text'
assert adaptor.content_type == 'text/plain; charset=utf-8'
def test_adaptor_parse_format_use_configured():
adaptor = FakeAdaptor(params={'format': 'json'})
- assert adaptor.parse_format(napi.StatusResult, 'text') == 'json'
+ assert glue.parse_format(adaptor, napi.StatusResult, 'text') == 'json'
assert adaptor.content_type == 'application/json; charset=utf-8'
@@ -74,37 +74,37 @@ def test_adaptor_parse_format_invalid_value():
adaptor = FakeAdaptor(params={'format': '@!#'})
with pytest.raises(FakeError, match='^400 -- .*must be one of'):
- adaptor.parse_format(napi.StatusResult, 'text')
+ glue.parse_format(adaptor, napi.StatusResult, 'text')
# ASGIAdaptor.get_accepted_languages()
def test_accepted_languages_from_param():
a = FakeAdaptor(params={'accept-language': 'de'})
- assert a.get_accepted_languages() == 'de'
+ assert glue.get_accepted_languages(a) == 'de'
def test_accepted_languages_from_header():
a = FakeAdaptor(headers={'accept-language': 'de'})
- assert a.get_accepted_languages() == 'de'
+ assert glue.get_accepted_languages(a) == 'de'
def test_accepted_languages_from_default(monkeypatch):
monkeypatch.setenv('NOMINATIM_DEFAULT_LANGUAGE', 'de')
a = FakeAdaptor()
- assert a.get_accepted_languages() == 'de'
+ assert glue.get_accepted_languages(a) == 'de'
def test_accepted_languages_param_over_header():
a = FakeAdaptor(params={'accept-language': 'de'},
headers={'accept-language': 'en'})
- assert a.get_accepted_languages() == 'de'
+ assert glue.get_accepted_languages(a) == 'de'
def test_accepted_languages_header_over_default(monkeypatch):
monkeypatch.setenv('NOMINATIM_DEFAULT_LANGUAGE', 'en')
a = FakeAdaptor(headers={'accept-language': 'de'})
- assert a.get_accepted_languages() == 'de'
+ assert glue.get_accepted_languages(a) == 'de'
# ASGIAdaptor.raise_error()
@@ -114,7 +114,7 @@ class TestAdaptorRaiseError:
@pytest.fixture(autouse=True)
def init_adaptor(self):
self.adaptor = FakeAdaptor()
- self.adaptor.setup_debugging()
+ glue.setup_debugging(self.adaptor)
def run_raise_error(self, msg, status):
with pytest.raises(FakeError) as excinfo:
@@ -155,7 +155,7 @@ class TestAdaptorRaiseError:
def test_raise_error_during_debug():
a = FakeAdaptor(params={'debug': '1'})
- a.setup_debugging()
+ glue.setup_debugging(a)
loglib.log().section('Ongoing')
with pytest.raises(FakeError) as excinfo:
@@ -172,7 +172,7 @@ def test_raise_error_during_debug():
# ASGIAdaptor.build_response
def test_build_response_without_content_type():
- resp = FakeAdaptor().build_response('attention')
+ resp = glue.build_response(FakeAdaptor(), 'attention')
assert isinstance(resp, FakeResponse)
assert resp.status == 200
@@ -182,9 +182,9 @@ def test_build_response_without_content_type():
def test_build_response_with_status():
a = FakeAdaptor(params={'format': 'json'})
- a.parse_format(napi.StatusResult, 'text')
+ glue.parse_format(a, napi.StatusResult, 'text')
- resp = a.build_response('stuff\nmore stuff', status=404)
+ resp = glue.build_response(a, 'stuff\nmore stuff', status=404)
assert isinstance(resp, FakeResponse)
assert resp.status == 404
@@ -194,9 +194,9 @@ def test_build_response_with_status():
def test_build_response_jsonp_with_json():
a = FakeAdaptor(params={'format': 'json', 'json_callback': 'test.func'})
- a.parse_format(napi.StatusResult, 'text')
+ glue.parse_format(a, napi.StatusResult, 'text')
- resp = a.build_response('{}')
+ resp = glue.build_response(a, '{}')
assert isinstance(resp, FakeResponse)
assert resp.status == 200
@@ -206,9 +206,9 @@ def test_build_response_jsonp_with_json():
def test_build_response_jsonp_without_json():
a = FakeAdaptor(params={'format': 'text', 'json_callback': 'test.func'})
- a.parse_format(napi.StatusResult, 'text')
+ glue.parse_format(a, napi.StatusResult, 'text')
- resp = a.build_response('{}')
+ resp = glue.build_response(a, '{}')
assert isinstance(resp, FakeResponse)
assert resp.status == 200
@@ -219,10 +219,10 @@ def test_build_response_jsonp_without_json():
@pytest.mark.parametrize('param', ['alert(); func', '\\n', '', 'a b'])
def test_build_response_jsonp_bad_format(param):
a = FakeAdaptor(params={'format': 'json', 'json_callback': param})
- a.parse_format(napi.StatusResult, 'text')
+ glue.parse_format(a, napi.StatusResult, 'text')
with pytest.raises(FakeError, match='^400 -- .*Invalid'):
- a.build_response('{}')
+ glue.build_response(a, '{}')
# status_endpoint()