remove v1-specific functions from ASGIAdaptor

This commit is contained in:
Sarah Hoffmann 2024-08-13 19:38:14 +02:00
parent 4f4a288757
commit d22ca186e4
2 changed files with 149 additions and 148 deletions

View File

@ -86,44 +86,6 @@ class ASGIAdaptor(abc.ABC):
"""
def build_response(self, output: str, status: int = 200, num_results: int = 0) -> Any:
""" Create a response from the given output. Wraps a JSONP function
around the response, if necessary.
"""
if self.content_type == CONTENT_JSON and status == 200:
jsonp = self.get('json_callback')
if jsonp is not None:
if any(not part.isidentifier() for part in jsonp.split('.')):
self.raise_error('Invalid json_callback value')
output = f"{jsonp}({output})"
self.content_type = 'application/javascript; charset=utf-8'
return self.create_response(status, output, num_results)
def raise_error(self, msg: str, status: int = 400) -> NoReturn:
""" Raise an exception resulting in the given HTTP status and
message. The message will be formatted according to the
output format chosen by the request.
"""
if self.content_type == CONTENT_XML:
msg = f"""<?xml version="1.0" encoding="UTF-8" ?>
<error>
<code>{status}</code>
<message>{msg}</message>
</error>
"""
elif self.content_type == CONTENT_JSON:
msg = f"""{{"error":{{"code":{status},"message":"{msg}"}}}}"""
elif self.content_type == CONTENT_HTML:
loglib.log().section('Execution error')
loglib.log().var_dump('Status', status)
loglib.log().var_dump('Message', msg)
msg = loglib.get_and_disable()
raise self.error(msg, status)
def get_int(self, name: str, default: Optional[int] = None) -> int:
""" Return an input parameter as an int. Raises an exception if
the parameter is given but not in an integer format.
@ -194,81 +156,120 @@ class ASGIAdaptor(abc.ABC):
return value != '0'
def get_accepted_languages(self) -> str:
""" Return the accepted languages.
def raise_error(self, msg: str, status: int = 400) -> NoReturn:
""" Raise an exception resulting in the given HTTP status and
message. The message will be formatted according to the
output format chosen by the request.
"""
return self.get('accept-language')\
or self.get_header('accept-language')\
or self.config().DEFAULT_LANGUAGE
if self.content_type == CONTENT_XML:
msg = f"""<?xml version="1.0" encoding="UTF-8" ?>
<error>
<code>{status}</code>
<message>{msg}</message>
</error>
"""
elif self.content_type == CONTENT_JSON:
msg = f"""{{"error":{{"code":{status},"message":"{msg}"}}}}"""
elif self.content_type == CONTENT_HTML:
loglib.log().section('Execution error')
loglib.log().var_dump('Status', status)
loglib.log().var_dump('Message', msg)
msg = loglib.get_and_disable()
raise self.error(msg, status)
def setup_debugging(self) -> bool:
""" Set up collection of debug information if requested.
def build_response(adaptor: ASGIAdaptor, output: str, status: int = 200,
num_results: int = 0) -> Any:
""" Create a response from the given output. Wraps a JSONP function
around the response, if necessary.
"""
if adaptor.content_type == CONTENT_JSON and status == 200:
jsonp = adaptor.get('json_callback')
if jsonp is not None:
if any(not part.isidentifier() for part in jsonp.split('.')):
adaptor.raise_error('Invalid json_callback value')
output = f"{jsonp}({output})"
adaptor.content_type = 'application/javascript; charset=utf-8'
Return True when debugging was requested.
"""
if self.get_bool('debug', False):
loglib.set_log_output('html')
self.content_type = CONTENT_HTML
return True
return False
return adaptor.create_response(status, output, num_results)
def get_layers(self) -> Optional[DataLayer]:
""" Return a parsed version of the layer parameter.
"""
param = self.get('layer', None)
if param is None:
return None
return cast(DataLayer,
reduce(DataLayer.__or__,
(getattr(DataLayer, s.upper()) for s in param.split(','))))
def get_accepted_languages(adaptor: ASGIAdaptor) -> str:
""" Return the accepted languages.
"""
return adaptor.get('accept-language')\
or adaptor.get_header('accept-language')\
or adaptor.config().DEFAULT_LANGUAGE
def parse_format(self, result_type: Type[Any], default: str) -> str:
""" Get and check the 'format' parameter and prepare the formatter.
`result_type` is the type of result to be returned by the function
and `default` the format value to assume when no parameter is present.
"""
fmt = self.get('format', default=default)
assert fmt is not None
def setup_debugging(adaptor: ASGIAdaptor) -> bool:
""" Set up collection of debug information if requested.
if not formatting.supports_format(result_type, fmt):
self.raise_error("Parameter 'format' must be one of: " +
', '.join(formatting.list_formats(result_type)))
Return True when debugging was requested.
"""
if adaptor.get_bool('debug', False):
loglib.set_log_output('html')
adaptor.content_type = CONTENT_HTML
return True
self.content_type = CONTENT_TYPE.get(fmt, CONTENT_JSON)
return fmt
return False
def parse_geometry_details(self, fmt: str) -> Dict[str, Any]:
""" Create details structure from the supplied geometry parameters.
"""
numgeoms = 0
output = GeometryFormat.NONE
if self.get_bool('polygon_geojson', False):
output |= GeometryFormat.GEOJSON
def get_layers(adaptor: ASGIAdaptor) -> Optional[DataLayer]:
""" Return a parsed version of the layer parameter.
"""
param = adaptor.get('layer', None)
if param is None:
return None
return cast(DataLayer,
reduce(DataLayer.__or__,
(getattr(DataLayer, s.upper()) for s in param.split(','))))
def parse_format(adaptor: ASGIAdaptor, result_type: Type[Any], default: str) -> str:
""" Get and check the 'format' parameter and prepare the formatter.
`result_type` is the type of result to be returned by the function
and `default` the format value to assume when no parameter is present.
"""
fmt = adaptor.get('format', default=default)
assert fmt is not None
if not formatting.supports_format(result_type, fmt):
adaptor.raise_error("Parameter 'format' must be one of: " +
', '.join(formatting.list_formats(result_type)))
adaptor.content_type = CONTENT_TYPE.get(fmt, CONTENT_JSON)
return fmt
def parse_geometry_details(adaptor: ASGIAdaptor, fmt: str) -> Dict[str, Any]:
""" Create details structure from the supplied geometry parameters.
"""
numgeoms = 0
output = GeometryFormat.NONE
if adaptor.get_bool('polygon_geojson', False):
output |= GeometryFormat.GEOJSON
numgeoms += 1
if fmt not in ('geojson', 'geocodejson'):
if adaptor.get_bool('polygon_text', False):
output |= GeometryFormat.TEXT
numgeoms += 1
if adaptor.get_bool('polygon_kml', False):
output |= GeometryFormat.KML
numgeoms += 1
if adaptor.get_bool('polygon_svg', False):
output |= GeometryFormat.SVG
numgeoms += 1
if fmt not in ('geojson', 'geocodejson'):
if self.get_bool('polygon_text', False):
output |= GeometryFormat.TEXT
numgeoms += 1
if self.get_bool('polygon_kml', False):
output |= GeometryFormat.KML
numgeoms += 1
if self.get_bool('polygon_svg', False):
output |= GeometryFormat.SVG
numgeoms += 1
if numgeoms > self.config().get_int('POLYGON_OUTPUT_MAX_TYPES'):
self.raise_error('Too many polygon output options selected.')
if numgeoms > adaptor.config().get_int('POLYGON_OUTPUT_MAX_TYPES'):
adaptor.raise_error('Too many polygon output options selected.')
return {'address_details': True,
'geometry_simplification': self.get_float('polygon_threshold', 0.0),
'geometry_output': output
}
return {'address_details': True,
'geometry_simplification': adaptor.get_float('polygon_threshold', 0.0),
'geometry_output': output
}
async def status_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
@ -276,21 +277,21 @@ async def status_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
"""
result = await api.status()
fmt = params.parse_format(StatusResult, 'text')
fmt = parse_format(params, StatusResult, 'text')
if fmt == 'text' and result.status:
status_code = 500
else:
status_code = 200
return params.build_response(formatting.format_result(result, fmt, {}),
return build_response(params, formatting.format_result(result, fmt, {}),
status=status_code)
async def details_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
""" Server glue for /details endpoint. See API docs for details.
"""
fmt = params.parse_format(DetailedResult, 'json')
fmt = parse_format(params, DetailedResult, 'json')
place_id = params.get_int('place_id', 0)
place: PlaceRef
if place_id:
@ -301,9 +302,9 @@ async def details_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
params.raise_error("Missing ID parameter 'place_id' or 'osmtype'.")
place = OsmID(osmtype, params.get_int('osmid'), params.get('class'))
debug = params.setup_debugging()
debug = setup_debugging(params)
locales = Locales.from_accept_languages(params.get_accepted_languages())
locales = Locales.from_accept_languages(get_accepted_languages(params))
result = await api.details(place,
address_details=params.get_bool('addressdetails', False),
@ -317,7 +318,7 @@ async def details_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
)
if debug:
return params.build_response(loglib.get_and_disable())
return build_response(params, loglib.get_and_disable())
if result is None:
params.raise_error('No place with that OSM ID found.', status=404)
@ -327,25 +328,25 @@ async def details_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
'group_hierarchy': params.get_bool('group_hierarchy', False),
'icon_base_url': params.config().MAPICON_URL})
return params.build_response(output, num_results=1)
return build_response(params, output, num_results=1)
async def reverse_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
""" Server glue for /reverse endpoint. See API docs for details.
"""
fmt = params.parse_format(ReverseResults, 'xml')
debug = params.setup_debugging()
fmt = parse_format(params, ReverseResults, 'xml')
debug = setup_debugging(params)
coord = Point(params.get_float('lon'), params.get_float('lat'))
details = params.parse_geometry_details(fmt)
details = parse_geometry_details(params, fmt)
details['max_rank'] = helpers.zoom_to_rank(params.get_int('zoom', 18))
details['layers'] = params.get_layers()
details['locales'] = Locales.from_accept_languages(params.get_accepted_languages())
details['layers'] = get_layers(params)
details['locales'] = Locales.from_accept_languages(get_accepted_languages(params))
result = await api.reverse(coord, **details)
if debug:
return params.build_response(loglib.get_and_disable(), num_results=1 if result else 0)
return build_response(params, loglib.get_and_disable(), num_results=1 if result else 0)
if fmt == 'xml':
queryparts = {'lat': str(coord.lat), 'lon': str(coord.lon), 'format': 'xml'}
@ -364,16 +365,16 @@ async def reverse_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
output = formatting.format_result(ReverseResults([result] if result else []),
fmt, fmt_options)
return params.build_response(output, num_results=1 if result else 0)
return build_response(params, output, num_results=1 if result else 0)
async def lookup_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
""" Server glue for /lookup endpoint. See API docs for details.
"""
fmt = params.parse_format(SearchResults, 'xml')
debug = params.setup_debugging()
details = params.parse_geometry_details(fmt)
details['locales'] = Locales.from_accept_languages(params.get_accepted_languages())
fmt = parse_format(params, SearchResults, 'xml')
debug = setup_debugging(params)
details = parse_geometry_details(params, fmt)
details['locales'] = Locales.from_accept_languages(get_accepted_languages(params))
places = []
for oid in (params.get('osm_ids') or '').split(','):
@ -390,7 +391,7 @@ async def lookup_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
results = SearchResults()
if debug:
return params.build_response(loglib.get_and_disable(), num_results=len(results))
return build_response(params, loglib.get_and_disable(), num_results=len(results))
fmt_options = {'extratags': params.get_bool('extratags', False),
'namedetails': params.get_bool('namedetails', False),
@ -398,7 +399,7 @@ async def lookup_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
output = formatting.format_result(results, fmt, fmt_options)
return params.build_response(output, num_results=len(results))
return build_response(params, output, num_results=len(results))
async def _unstructured_search(query: str, api: NominatimAPIAsync,
@ -435,9 +436,9 @@ async def _unstructured_search(query: str, api: NominatimAPIAsync,
async def search_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
""" Server glue for /search endpoint. See API docs for details.
"""
fmt = params.parse_format(SearchResults, 'jsonv2')
debug = params.setup_debugging()
details = params.parse_geometry_details(fmt)
fmt = parse_format(params, SearchResults, 'jsonv2')
debug = setup_debugging(params)
details = parse_geometry_details(params, fmt)
details['countries'] = params.get('countrycodes', None)
details['excluded'] = params.get('exclude_place_ids', None)
@ -454,9 +455,9 @@ async def search_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
if params.get('featureType', None) is not None:
details['layers'] = DataLayer.ADDRESS
else:
details['layers'] = params.get_layers()
details['layers'] = get_layers(params)
details['locales'] = Locales.from_accept_languages(params.get_accepted_languages())
details['locales'] = Locales.from_accept_languages(get_accepted_languages(params))
# unstructured query parameters
query = params.get('q', None)
@ -486,7 +487,7 @@ async def search_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
results = helpers.deduplicate_results(results, max_results)
if debug:
return params.build_response(loglib.get_and_disable(), num_results=len(results))
return build_response(params, loglib.get_and_disable(), num_results=len(results))
if fmt == 'xml':
helpers.extend_query_parts(queryparts, details,
@ -509,7 +510,7 @@ async def search_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
output = formatting.format_result(results, fmt, fmt_options)
return params.build_response(output, num_results=len(results))
return build_response(params, output, num_results=len(results))
async def deletable_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
@ -518,7 +519,7 @@ async def deletable_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any
deleted or are broken in the OSM data but are kept in the
Nominatim database to minimize disruption.
"""
fmt = params.parse_format(RawDataList, 'json')
fmt = parse_format(params, RawDataList, 'json')
async with api.begin() as conn:
sql = sa.text(""" SELECT p.place_id, country_code,
@ -529,7 +530,7 @@ async def deletable_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any
""")
results = RawDataList(r._asdict() for r in await conn.execute(sql))
return params.build_response(formatting.format_result(results, fmt, {}))
return build_response(params, formatting.format_result(results, fmt, {}))
async def polygons_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
@ -538,7 +539,7 @@ async def polygons_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
their size but are kept in the Nominatim database with their
old area to minimize disruption.
"""
fmt = params.parse_format(RawDataList, 'json')
fmt = parse_format(params, RawDataList, 'json')
sql_params: Dict[str, Any] = {
'days': params.get_int('days', -1),
'cls': params.get('class')
@ -561,7 +562,7 @@ async def polygons_endpoint(api: NominatimAPIAsync, params: ASGIAdaptor) -> Any:
results = RawDataList(r._asdict() for r in await conn.execute(sql, sql_params))
return params.build_response(formatting.format_result(results, fmt, {}))
return build_response(params, formatting.format_result(results, fmt, {}))
EndpointFunc = Callable[[NominatimAPIAsync, ASGIAdaptor], Any]

View File

@ -59,14 +59,14 @@ def test_adaptor_get_bool_falsish():
def test_adaptor_parse_format_use_default():
adaptor = FakeAdaptor()
assert adaptor.parse_format(napi.StatusResult, 'text') == 'text'
assert glue.parse_format(adaptor, napi.StatusResult, 'text') == 'text'
assert adaptor.content_type == 'text/plain; charset=utf-8'
def test_adaptor_parse_format_use_configured():
adaptor = FakeAdaptor(params={'format': 'json'})
assert adaptor.parse_format(napi.StatusResult, 'text') == 'json'
assert glue.parse_format(adaptor, napi.StatusResult, 'text') == 'json'
assert adaptor.content_type == 'application/json; charset=utf-8'
@ -74,37 +74,37 @@ def test_adaptor_parse_format_invalid_value():
adaptor = FakeAdaptor(params={'format': '@!#'})
with pytest.raises(FakeError, match='^400 -- .*must be one of'):
adaptor.parse_format(napi.StatusResult, 'text')
glue.parse_format(adaptor, napi.StatusResult, 'text')
# ASGIAdaptor.get_accepted_languages()
def test_accepted_languages_from_param():
a = FakeAdaptor(params={'accept-language': 'de'})
assert a.get_accepted_languages() == 'de'
assert glue.get_accepted_languages(a) == 'de'
def test_accepted_languages_from_header():
a = FakeAdaptor(headers={'accept-language': 'de'})
assert a.get_accepted_languages() == 'de'
assert glue.get_accepted_languages(a) == 'de'
def test_accepted_languages_from_default(monkeypatch):
monkeypatch.setenv('NOMINATIM_DEFAULT_LANGUAGE', 'de')
a = FakeAdaptor()
assert a.get_accepted_languages() == 'de'
assert glue.get_accepted_languages(a) == 'de'
def test_accepted_languages_param_over_header():
a = FakeAdaptor(params={'accept-language': 'de'},
headers={'accept-language': 'en'})
assert a.get_accepted_languages() == 'de'
assert glue.get_accepted_languages(a) == 'de'
def test_accepted_languages_header_over_default(monkeypatch):
monkeypatch.setenv('NOMINATIM_DEFAULT_LANGUAGE', 'en')
a = FakeAdaptor(headers={'accept-language': 'de'})
assert a.get_accepted_languages() == 'de'
assert glue.get_accepted_languages(a) == 'de'
# ASGIAdaptor.raise_error()
@ -114,7 +114,7 @@ class TestAdaptorRaiseError:
@pytest.fixture(autouse=True)
def init_adaptor(self):
self.adaptor = FakeAdaptor()
self.adaptor.setup_debugging()
glue.setup_debugging(self.adaptor)
def run_raise_error(self, msg, status):
with pytest.raises(FakeError) as excinfo:
@ -155,7 +155,7 @@ class TestAdaptorRaiseError:
def test_raise_error_during_debug():
a = FakeAdaptor(params={'debug': '1'})
a.setup_debugging()
glue.setup_debugging(a)
loglib.log().section('Ongoing')
with pytest.raises(FakeError) as excinfo:
@ -172,7 +172,7 @@ def test_raise_error_during_debug():
# ASGIAdaptor.build_response
def test_build_response_without_content_type():
resp = FakeAdaptor().build_response('attention')
resp = glue.build_response(FakeAdaptor(), 'attention')
assert isinstance(resp, FakeResponse)
assert resp.status == 200
@ -182,9 +182,9 @@ def test_build_response_without_content_type():
def test_build_response_with_status():
a = FakeAdaptor(params={'format': 'json'})
a.parse_format(napi.StatusResult, 'text')
glue.parse_format(a, napi.StatusResult, 'text')
resp = a.build_response('stuff\nmore stuff', status=404)
resp = glue.build_response(a, 'stuff\nmore stuff', status=404)
assert isinstance(resp, FakeResponse)
assert resp.status == 404
@ -194,9 +194,9 @@ def test_build_response_with_status():
def test_build_response_jsonp_with_json():
a = FakeAdaptor(params={'format': 'json', 'json_callback': 'test.func'})
a.parse_format(napi.StatusResult, 'text')
glue.parse_format(a, napi.StatusResult, 'text')
resp = a.build_response('{}')
resp = glue.build_response(a, '{}')
assert isinstance(resp, FakeResponse)
assert resp.status == 200
@ -206,9 +206,9 @@ def test_build_response_jsonp_with_json():
def test_build_response_jsonp_without_json():
a = FakeAdaptor(params={'format': 'text', 'json_callback': 'test.func'})
a.parse_format(napi.StatusResult, 'text')
glue.parse_format(a, napi.StatusResult, 'text')
resp = a.build_response('{}')
resp = glue.build_response(a, '{}')
assert isinstance(resp, FakeResponse)
assert resp.status == 200
@ -219,10 +219,10 @@ def test_build_response_jsonp_without_json():
@pytest.mark.parametrize('param', ['alert(); func', '\\n', '', 'a b'])
def test_build_response_jsonp_bad_format(param):
a = FakeAdaptor(params={'format': 'json', 'json_callback': param})
a.parse_format(napi.StatusResult, 'text')
glue.parse_format(a, napi.StatusResult, 'text')
with pytest.raises(FakeError, match='^400 -- .*Invalid'):
a.build_response('{}')
glue.build_response(a, '{}')
# status_endpoint()