Merge pull request #3020 from lonvia/reverse-api

Python implementation of reverse
This commit is contained in:
Sarah Hoffmann 2023-03-31 18:01:50 +02:00 committed by GitHub
commit f8bca4fbcb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 3234 additions and 284 deletions

View File

@ -13,6 +13,6 @@ ignored-classes=NominatimArgs,closing
# 'too-many-ancestors' is triggered already by deriving from UserDict
# 'not-context-manager' disabled because it causes false positives once
# typed Python is enabled. See also https://github.com/PyCQA/pylint/issues/5273
disable=too-few-public-methods,duplicate-code,too-many-ancestors,bad-option-value,no-self-use,not-context-manager,use-dict-literal
disable=too-few-public-methods,duplicate-code,too-many-ancestors,bad-option-value,no-self-use,not-context-manager,use-dict-literal,chained-comparison
good-names=i,x,y,m,t,fd,db,cc
good-names=i,x,y,m,t,fd,db,cc,x1,x2,y1,y2,pt,k,v

View File

@ -21,12 +21,16 @@ from .types import (PlaceID as PlaceID,
OsmID as OsmID,
PlaceRef as PlaceRef,
Point as Point,
Bbox as Bbox,
GeometryFormat as GeometryFormat,
LookupDetails as LookupDetails)
LookupDetails as LookupDetails,
DataLayer as DataLayer)
from .results import (SourceTable as SourceTable,
AddressLine as AddressLine,
AddressLines as AddressLines,
WordInfo as WordInfo,
WordInfos as WordInfos,
SearchResult as SearchResult)
DetailedResult as DetailedResult,
ReverseResult as ReverseResult,
ReverseResults as ReverseResults)
from .localization import (Locales as Locales)

View File

@ -21,8 +21,9 @@ from nominatim.config import Configuration
from nominatim.api.connection import SearchConnection
from nominatim.api.status import get_status, StatusResult
from nominatim.api.lookup import get_place_by_id
from nominatim.api.types import PlaceRef, LookupDetails
from nominatim.api.results import SearchResult
from nominatim.api.reverse import ReverseGeocoder
from nominatim.api.types import PlaceRef, LookupDetails, AnyPoint, DataLayer
from nominatim.api.results import DetailedResult, ReverseResult
class NominatimAPIAsync:
@ -52,13 +53,16 @@ class NominatimAPIAsync:
dsn = self.config.get_database_params()
query = {k: v for k, v in dsn.items()
if k not in ('user', 'password', 'dbname', 'host', 'port')}
query['prepared_statement_cache_size'] = '0'
dburl = sa.engine.URL.create(
'postgresql+asyncpg',
database=dsn.get('dbname'),
username=dsn.get('user'), password=dsn.get('password'),
host=dsn.get('host'), port=int(dsn['port']) if 'port' in dsn else None,
query={k: v for k, v in dsn.items()
if k not in ('user', 'password', 'dbname', 'host', 'port')})
query=query)
engine = sa_asyncio.create_async_engine(
dburl, future=True,
connect_args={'server_settings': {
@ -127,13 +131,37 @@ class NominatimAPIAsync:
async def lookup(self, place: PlaceRef,
details: LookupDetails) -> Optional[SearchResult]:
details: Optional[LookupDetails] = None) -> Optional[DetailedResult]:
""" Get detailed information about a place in the database.
Returns None if there is no entry under the given ID.
"""
async with self.begin() as db:
return await get_place_by_id(db, place, details)
async with self.begin() as conn:
return await get_place_by_id(conn, place, details or LookupDetails())
async def reverse(self, coord: AnyPoint, max_rank: Optional[int] = None,
layer: Optional[DataLayer] = None,
details: Optional[LookupDetails] = None) -> Optional[ReverseResult]:
""" Find a place by its coordinates. Also known as reverse geocoding.
Returns the closest result that can be found or None if
no place matches the given criteria.
"""
# The following negation handles NaN correctly. Don't change.
if not abs(coord[0]) <= 180 or not abs(coord[1]) <= 90:
# There are no results to be expected outside valid coordinates.
return None
if layer is None:
layer = DataLayer.ADDRESS | DataLayer.POI
max_rank = max(0, min(max_rank or 30, 30))
async with self.begin() as conn:
geocoder = ReverseGeocoder(conn, max_rank, layer,
details or LookupDetails())
return await geocoder.lookup(coord)
class NominatimAPI:
@ -168,7 +196,19 @@ class NominatimAPI:
def lookup(self, place: PlaceRef,
details: LookupDetails) -> Optional[SearchResult]:
details: Optional[LookupDetails] = None) -> Optional[DetailedResult]:
""" Get detailed information about a place in the database.
"""
return self._loop.run_until_complete(self._async_api.lookup(place, details))
def reverse(self, coord: AnyPoint, max_rank: Optional[int] = None,
layer: Optional[DataLayer] = None,
details: Optional[LookupDetails] = None) -> Optional[ReverseResult]:
""" Find a place by its coordinates. Also known as reverse geocoding.
Returns the closest result that can be found or None if
no place matches the given criteria.
"""
return self._loop.run_until_complete(
self._async_api.reverse(coord, max_rank, layer, details))

View File

@ -60,6 +60,19 @@ class BaseLogger:
""" Print the SQL for the given statement.
"""
def format_sql(self, conn: AsyncConnection, statement: 'sa.Executable') -> str:
""" Return the comiled version of the statement.
"""
try:
return str(cast('sa.ClauseElement', statement)
.compile(conn.sync_engine, compile_kwargs={"literal_binds": True}))
except sa.exc.CompileError:
pass
except NotImplementedError:
pass
return str(cast('sa.ClauseElement', statement).compile(conn.sync_engine))
class HTMLLogger(BaseLogger):
""" Logger that formats messages in HTML.
@ -92,8 +105,7 @@ class HTMLLogger(BaseLogger):
def sql(self, conn: AsyncConnection, statement: 'sa.Executable') -> None:
sqlstr = str(cast('sa.ClauseElement', statement)
.compile(conn.sync_engine, compile_kwargs={"literal_binds": True}))
sqlstr = self.format_sql(conn, statement)
if CODE_HIGHLIGHT:
sqlstr = highlight(sqlstr, PostgresLexer(),
HtmlFormatter(nowrap=True, lineseparator='<br />'))
@ -147,9 +159,7 @@ class TextLogger(BaseLogger):
def sql(self, conn: AsyncConnection, statement: 'sa.Executable') -> None:
sqlstr = str(cast('sa.ClauseElement', statement)
.compile(conn.sync_engine, compile_kwargs={"literal_binds": True}))
sqlstr = '\n| '.join(textwrap.wrap(sqlstr, width=78))
sqlstr = '\n| '.join(textwrap.wrap(self.format_sql(conn, statement), width=78))
self._write(f"| {sqlstr}\n\n")

View File

@ -8,6 +8,7 @@
Implementation of place lookup by ID.
"""
from typing import Optional
import datetime as dt
import sqlalchemy as sa
@ -101,14 +102,17 @@ async def find_in_tiger(conn: SearchConnection, place: ntyp.PlaceRef,
"""
log().section("Find in TIGER table")
t = conn.t.tiger
parent = conn.t.placex
sql = sa.select(t.c.place_id, t.c.parent_place_id,
parent.c.osm_type, parent.c.osm_id,
t.c.startnumber, t.c.endnumber, t.c.step,
t.c.postcode,
t.c.linegeo.ST_Centroid().label('centroid'),
_select_column_geometry(t.c.linegeo, details.geometry_output))
if isinstance(place, ntyp.PlaceID):
sql = sql.where(t.c.place_id == place.place_id)
sql = sql.where(t.c.place_id == place.place_id)\
.join(parent, t.c.parent_place_id == parent.c.place_id, isouter=True)
else:
return None
@ -137,7 +141,7 @@ async def find_in_postcode(conn: SearchConnection, place: ntyp.PlaceRef,
async def get_place_by_id(conn: SearchConnection, place: ntyp.PlaceRef,
details: ntyp.LookupDetails) -> Optional[nres.SearchResult]:
details: ntyp.LookupDetails) -> Optional[nres.DetailedResult]:
""" Retrieve a place with additional details from the database.
"""
log().function('get_place_by_id', place=place, details=details)
@ -146,32 +150,36 @@ async def get_place_by_id(conn: SearchConnection, place: ntyp.PlaceRef,
raise ValueError("lookup only supports geojosn polygon output.")
row = await find_in_placex(conn, place, details)
log().var_dump('Result (placex)', row)
if row is not None:
result = nres.create_from_placex_row(row)
log().var_dump('Result', result)
await nres.add_result_details(conn, result, details)
return result
result = nres.create_from_placex_row(row, nres.DetailedResult)
else:
row = await find_in_osmline(conn, place, details)
log().var_dump('Result (osmline)', row)
if row is not None:
result = nres.create_from_osmline_row(row, nres.DetailedResult)
else:
row = await find_in_postcode(conn, place, details)
log().var_dump('Result (postcode)', row)
if row is not None:
result = nres.create_from_postcode_row(row, nres.DetailedResult)
else:
row = await find_in_tiger(conn, place, details)
log().var_dump('Result (tiger)', row)
if row is not None:
result = nres.create_from_tiger_row(row, nres.DetailedResult)
else:
return None
row = await find_in_osmline(conn, place, details)
if row is not None:
result = nres.create_from_osmline_row(row)
log().var_dump('Result', result)
await nres.add_result_details(conn, result, details)
return result
# add missing details
assert result is not None
result.parent_place_id = row.parent_place_id
result.linked_place_id = getattr(row, 'linked_place_id', None)
result.admin_level = getattr(row, 'admin_level', 15)
indexed_date = getattr(row, 'indexed_date', None)
if indexed_date is not None:
result.indexed_date = indexed_date.replace(tzinfo=dt.timezone.utc)
row = await find_in_postcode(conn, place, details)
if row is not None:
result = nres.create_from_postcode_row(row)
log().var_dump('Result', result)
await nres.add_result_details(conn, result, details)
return result
await nres.add_result_details(conn, result, details)
row = await find_in_tiger(conn, place, details)
if row is not None:
result = nres.create_from_tiger_row(row)
log().var_dump('Result', result)
await nres.add_result_details(conn, result, details)
return result
# Nothing found under this ID.
return None
return result

View File

@ -11,7 +11,7 @@ Data classes are part of the public API while the functions are for
internal use only. That's why they are implemented as free-standing functions
instead of member functions.
"""
from typing import Optional, Tuple, Dict, Sequence
from typing import Optional, Tuple, Dict, Sequence, TypeVar, Type, List
import enum
import dataclasses
import datetime as dt
@ -19,9 +19,10 @@ import datetime as dt
import sqlalchemy as sa
from nominatim.typing import SaSelect, SaRow
from nominatim.api.types import Point, LookupDetails
from nominatim.api.types import Point, Bbox, LookupDetails
from nominatim.api.connection import SearchConnection
from nominatim.api.logging import log
from nominatim.api.localization import Locales
# This file defines complex result data classes.
# pylint: disable=too-many-instance-attributes
@ -52,8 +53,30 @@ class AddressLine:
rank_address: int
distance: float
local_name: Optional[str] = None
class AddressLines(List[AddressLine]):
""" Sequence of address lines order in descending order by their rank.
"""
def localize(self, locales: Locales) -> List[str]:
""" Set the local name of address parts according to the chosen
locale. Return the list of local names without duplications.
Only address parts that are marked as isaddress are localized
and returned.
"""
label_parts: List[str] = []
for line in self:
if line.isaddress and line.names:
line.local_name = locales.display_name(line.names)
if not label_parts or label_parts[-1] != line.local_name:
label_parts.append(line.local_name)
return label_parts
AddressLines = Sequence[AddressLine]
@dataclasses.dataclass
@ -69,18 +92,16 @@ WordInfos = Sequence[WordInfo]
@dataclasses.dataclass
class SearchResult:
""" Data class collecting all available information about a search result.
class BaseResult:
""" Data class collecting information common to all
types of search results.
"""
source_table: SourceTable
category: Tuple[str, str]
centroid: Point
place_id : Optional[int] = None
parent_place_id: Optional[int] = None
linked_place_id: Optional[int] = None
osm_object: Optional[Tuple[str, int]] = None
admin_level: int = 15
names: Optional[Dict[str, str]] = None
address: Optional[Dict[str, str]] = None
@ -96,8 +117,6 @@ class SearchResult:
country_code: Optional[str] = None
indexed_date: Optional[dt.datetime] = None
address_rows: Optional[AddressLines] = None
linked_rows: Optional[AddressLines] = None
parented_rows: Optional[AddressLines] = None
@ -106,10 +125,6 @@ class SearchResult:
geometry: Dict[str, str] = dataclasses.field(default_factory=dict)
def __post_init__(self) -> None:
if self.indexed_date is not None and self.indexed_date.tzinfo is None:
self.indexed_date = self.indexed_date.replace(tzinfo=dt.timezone.utc)
@property
def lat(self) -> float:
""" Get the latitude (or y) of the center point of the place.
@ -131,93 +146,153 @@ class SearchResult:
"""
return self.importance or (0.7500001 - (self.rank_search/40.0))
BaseResultT = TypeVar('BaseResultT', bound=BaseResult)
@dataclasses.dataclass
class DetailedResult(BaseResult):
""" A search result with more internal information from the database
added.
"""
parent_place_id: Optional[int] = None
linked_place_id: Optional[int] = None
admin_level: int = 15
indexed_date: Optional[dt.datetime] = None
@dataclasses.dataclass
class ReverseResult(BaseResult):
""" A search result for reverse geocoding.
"""
distance: Optional[float] = None
bbox: Optional[Bbox] = None
class ReverseResults(List[ReverseResult]):
""" Sequence of reverse lookup results ordered by distance.
May be empty when no result was found.
"""
def _filter_geometries(row: SaRow) -> Dict[str, str]:
return {k[9:]: v for k, v in row._mapping.items() # pylint: disable=W0212
if k.startswith('geometry_')}
def create_from_placex_row(row: SaRow) -> SearchResult:
""" Construct a new SearchResult and add the data from the result row
from the placex table.
def create_from_placex_row(row: Optional[SaRow],
class_type: Type[BaseResultT]) -> Optional[BaseResultT]:
""" Construct a new result and add the data from the result row
from the placex table. 'class_type' defines the type of result
to return. Returns None if the row is None.
"""
return SearchResult(source_table=SourceTable.PLACEX,
place_id=row.place_id,
parent_place_id=row.parent_place_id,
linked_place_id=row.linked_place_id,
osm_object=(row.osm_type, row.osm_id),
category=(row.class_, row.type),
admin_level=row.admin_level,
names=row.name,
address=row.address,
extratags=row.extratags,
housenumber=row.housenumber,
postcode=row.postcode,
wikipedia=row.wikipedia,
rank_address=row.rank_address,
rank_search=row.rank_search,
importance=row.importance,
country_code=row.country_code,
indexed_date=getattr(row, 'indexed_date'),
centroid=Point.from_wkb(row.centroid.data),
geometry=_filter_geometries(row))
if row is None:
return None
return class_type(source_table=SourceTable.PLACEX,
place_id=row.place_id,
osm_object=(row.osm_type, row.osm_id),
category=(row.class_, row.type),
names=row.name,
address=row.address,
extratags=row.extratags,
housenumber=row.housenumber,
postcode=row.postcode,
wikipedia=row.wikipedia,
rank_address=row.rank_address,
rank_search=row.rank_search,
importance=row.importance,
country_code=row.country_code,
centroid=Point.from_wkb(row.centroid.data),
geometry=_filter_geometries(row))
def create_from_osmline_row(row: SaRow) -> SearchResult:
""" Construct a new SearchResult and add the data from the result row
from the osmline table.
def create_from_osmline_row(row: Optional[SaRow],
class_type: Type[BaseResultT]) -> Optional[BaseResultT]:
""" Construct a new result and add the data from the result row
from the address interpolation table osmline. 'class_type' defines
the type of result to return. Returns None if the row is None.
If the row contains a housenumber, then the housenumber is filled out.
Otherwise the result contains the interpolation information in extratags.
"""
return SearchResult(source_table=SourceTable.OSMLINE,
place_id=row.place_id,
parent_place_id=row.parent_place_id,
osm_object=('W', row.osm_id),
category=('place', 'houses'),
address=row.address,
postcode=row.postcode,
extratags={'startnumber': str(row.startnumber),
'endnumber': str(row.endnumber),
'step': str(row.step)},
country_code=row.country_code,
indexed_date=getattr(row, 'indexed_date'),
centroid=Point.from_wkb(row.centroid.data),
geometry=_filter_geometries(row))
if row is None:
return None
hnr = getattr(row, 'housenumber', None)
res = class_type(source_table=SourceTable.OSMLINE,
place_id=row.place_id,
osm_object=('W', row.osm_id),
category=('place', 'houses' if hnr is None else 'house'),
address=row.address,
postcode=row.postcode,
country_code=row.country_code,
centroid=Point.from_wkb(row.centroid.data),
geometry=_filter_geometries(row))
if hnr is None:
res.extratags = {'startnumber': str(row.startnumber),
'endnumber': str(row.endnumber),
'step': str(row.step)}
else:
res.housenumber = str(hnr)
return res
def create_from_tiger_row(row: SaRow) -> SearchResult:
""" Construct a new SearchResult and add the data from the result row
from the Tiger table.
def create_from_tiger_row(row: Optional[SaRow],
class_type: Type[BaseResultT]) -> Optional[BaseResultT]:
""" Construct a new result and add the data from the result row
from the Tiger data interpolation table. 'class_type' defines
the type of result to return. Returns None if the row is None.
If the row contains a housenumber, then the housenumber is filled out.
Otherwise the result contains the interpolation information in extratags.
"""
return SearchResult(source_table=SourceTable.TIGER,
place_id=row.place_id,
parent_place_id=row.parent_place_id,
category=('place', 'houses'),
postcode=row.postcode,
extratags={'startnumber': str(row.startnumber),
'endnumber': str(row.endnumber),
'step': str(row.step)},
country_code='us',
centroid=Point.from_wkb(row.centroid.data),
geometry=_filter_geometries(row))
if row is None:
return None
hnr = getattr(row, 'housenumber', None)
res = class_type(source_table=SourceTable.TIGER,
place_id=row.place_id,
osm_object=(row.osm_type, row.osm_id),
category=('place', 'houses' if hnr is None else 'house'),
postcode=row.postcode,
country_code='us',
centroid=Point.from_wkb(row.centroid.data),
geometry=_filter_geometries(row))
if hnr is None:
res.extratags = {'startnumber': str(row.startnumber),
'endnumber': str(row.endnumber),
'step': str(row.step)}
else:
res.housenumber = str(hnr)
return res
def create_from_postcode_row(row: SaRow) -> SearchResult:
""" Construct a new SearchResult and add the data from the result row
from the postcode centroid table.
def create_from_postcode_row(row: Optional[SaRow],
class_type: Type[BaseResultT]) -> Optional[BaseResultT]:
""" Construct a new result and add the data from the result row
from the postcode table. 'class_type' defines
the type of result to return. Returns None if the row is None.
"""
return SearchResult(source_table=SourceTable.POSTCODE,
place_id=row.place_id,
parent_place_id=row.parent_place_id,
category=('place', 'postcode'),
names={'ref': row.postcode},
rank_search=row.rank_search,
rank_address=row.rank_address,
country_code=row.country_code,
centroid=Point.from_wkb(row.centroid.data),
indexed_date=row.indexed_date,
geometry=_filter_geometries(row))
if row is None:
return None
return class_type(source_table=SourceTable.POSTCODE,
place_id=row.place_id,
category=('place', 'postcode'),
names={'ref': row.postcode},
rank_search=row.rank_search,
rank_address=row.rank_address,
country_code=row.country_code,
centroid=Point.from_wkb(row.centroid.data),
geometry=_filter_geometries(row))
async def add_result_details(conn: SearchConnection, result: SearchResult,
async def add_result_details(conn: SearchConnection, result: BaseResult,
details: LookupDetails) -> None:
""" Retrieve more details from the database according to the
parameters specified in 'details'.
@ -241,8 +316,8 @@ def _result_row_to_address_row(row: SaRow) -> AddressLine:
""" Create a new AddressLine from the results of a datbase query.
"""
extratags: Dict[str, str] = getattr(row, 'extratags', {})
if 'place_type' in row:
extratags['place_type'] = row.place_type
if hasattr(row, 'place_type') and row.place_type:
extratags['place'] = row.place_type
names = row.name
if getattr(row, 'housenumber', None) is not None:
@ -262,7 +337,7 @@ def _result_row_to_address_row(row: SaRow) -> AddressLine:
distance=row.distance)
async def complete_address_details(conn: SearchConnection, result: SearchResult) -> None:
async def complete_address_details(conn: SearchConnection, result: BaseResult) -> None:
""" Retrieve information about places that make up the address of the result.
"""
housenumber = -1
@ -288,10 +363,11 @@ async def complete_address_details(conn: SearchConnection, result: SearchResult)
sql = sa.select(sfn).order_by(sa.column('rank_address').desc(),
sa.column('isaddress').desc())
result.address_rows = []
result.address_rows = AddressLines()
for row in await conn.execute(sql):
result.address_rows.append(_result_row_to_address_row(row))
# pylint: disable=consider-using-f-string
def _placex_select_address_row(conn: SearchConnection,
centroid: Point) -> SaSelect:
@ -308,10 +384,10 @@ def _placex_select_address_row(conn: SearchConnection,
""" % centroid).label('distance'))
async def complete_linked_places(conn: SearchConnection, result: SearchResult) -> None:
async def complete_linked_places(conn: SearchConnection, result: BaseResult) -> None:
""" Retrieve information about places that link to the result.
"""
result.linked_rows = []
result.linked_rows = AddressLines()
if result.source_table != SourceTable.PLACEX:
return
@ -322,7 +398,7 @@ async def complete_linked_places(conn: SearchConnection, result: SearchResult) -
result.linked_rows.append(_result_row_to_address_row(row))
async def complete_keywords(conn: SearchConnection, result: SearchResult) -> None:
async def complete_keywords(conn: SearchConnection, result: BaseResult) -> None:
""" Retrieve information about the search terms used for this place.
"""
t = conn.t.search_name
@ -342,11 +418,11 @@ async def complete_keywords(conn: SearchConnection, result: SearchResult) -> Non
result.address_keywords.append(WordInfo(*row))
async def complete_parented_places(conn: SearchConnection, result: SearchResult) -> None:
async def complete_parented_places(conn: SearchConnection, result: BaseResult) -> None:
""" Retrieve information about places that the result provides the
address for.
"""
result.parented_rows = []
result.parented_rows = AddressLines()
if result.source_table != SourceTable.PLACEX:
return

544
nominatim/api/reverse.py Normal file
View File

@ -0,0 +1,544 @@
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2023 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Implementation of reverse geocoding.
"""
from typing import Optional, List, Callable, Type, Tuple
import sqlalchemy as sa
from geoalchemy2 import WKTElement
from nominatim.typing import SaColumn, SaSelect, SaFromClause, SaLabel, SaRow
from nominatim.api.connection import SearchConnection
import nominatim.api.results as nres
from nominatim.api.logging import log
from nominatim.api.types import AnyPoint, DataLayer, LookupDetails, GeometryFormat, Bbox
# In SQLAlchemy expression which compare with NULL need to be expressed with
# the equal sign.
# pylint: disable=singleton-comparison
RowFunc = Callable[[Optional[SaRow], Type[nres.ReverseResult]], Optional[nres.ReverseResult]]
def _select_from_placex(t: SaFromClause, wkt: Optional[str] = None) -> SaSelect:
""" Create a select statement with the columns relevant for reverse
results.
"""
if wkt is None:
distance = t.c.distance
centroid = t.c.centroid
else:
distance = t.c.geometry.ST_Distance(wkt)
centroid = sa.case(
(t.c.geometry.ST_GeometryType().in_(('ST_LineString',
'ST_MultiLineString')),
t.c.geometry.ST_ClosestPoint(wkt)),
else_=t.c.centroid).label('centroid')
return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
t.c.class_, t.c.type,
t.c.address, t.c.extratags,
t.c.housenumber, t.c.postcode, t.c.country_code,
t.c.importance, t.c.wikipedia,
t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
centroid,
distance.label('distance'),
t.c.geometry.ST_Expand(0).label('bbox'))
def _interpolated_housenumber(table: SaFromClause) -> SaLabel:
return sa.cast(table.c.startnumber
+ sa.func.round(((table.c.endnumber - table.c.startnumber) * table.c.position)
/ table.c.step) * table.c.step,
sa.Integer).label('housenumber')
def _interpolated_position(table: SaFromClause) -> SaLabel:
fac = sa.cast(table.c.step, sa.Float) / (table.c.endnumber - table.c.startnumber)
rounded_pos = sa.func.round(table.c.position / fac) * fac
return sa.case(
(table.c.endnumber == table.c.startnumber, table.c.linegeo.ST_Centroid()),
else_=table.c.linegeo.ST_LineInterpolatePoint(rounded_pos)).label('centroid')
def _locate_interpolation(table: SaFromClause, wkt: WKTElement) -> SaLabel:
""" Given a position, locate the closest point on the line.
"""
return sa.case((table.c.linegeo.ST_GeometryType() == 'ST_LineString',
sa.func.ST_LineLocatePoint(table.c.linegeo, wkt)),
else_=0).label('position')
def _is_address_point(table: SaFromClause) -> SaColumn:
return sa.and_(table.c.rank_address == 30,
sa.or_(table.c.housenumber != None,
table.c.name.has_key('housename')))
def _get_closest(*rows: Optional[SaRow]) -> Optional[SaRow]:
return min(rows, key=lambda row: 1000 if row is None else row.distance)
class ReverseGeocoder:
""" Class implementing the logic for looking up a place from a
coordinate.
"""
def __init__(self, conn: SearchConnection, max_rank: int, layer: DataLayer,
details: LookupDetails) -> None:
self.conn = conn
self.max_rank = max_rank
self.layer = layer
self.details = details
def layer_enabled(self, *layer: DataLayer) -> bool:
""" Return true when any of the given layer types are requested.
"""
return any(self.layer & l for l in layer)
def layer_disabled(self, *layer: DataLayer) -> bool:
""" Return true when none of the given layer types is requested.
"""
return not any(self.layer & l for l in layer)
def has_feature_layers(self) -> bool:
""" Return true if any layer other than ADDRESS or POI is requested.
"""
return self.layer_enabled(DataLayer.RAILWAY, DataLayer.MANMADE, DataLayer.NATURAL)
def _add_geometry_columns(self, sql: SaSelect, col: SaColumn) -> SaSelect:
if not self.details.geometry_output:
return sql
out = []
if self.details.geometry_simplification > 0.0:
col = col.ST_SimplifyPreserveTopology(self.details.geometry_simplification)
if self.details.geometry_output & GeometryFormat.GEOJSON:
out.append(col.ST_AsGeoJSON().label('geometry_geojson'))
if self.details.geometry_output & GeometryFormat.TEXT:
out.append(col.ST_AsText().label('geometry_text'))
if self.details.geometry_output & GeometryFormat.KML:
out.append(col.ST_AsKML().label('geometry_kml'))
if self.details.geometry_output & GeometryFormat.SVG:
out.append(col.ST_AsSVG().label('geometry_svg'))
return sql.add_columns(*out)
def _filter_by_layer(self, table: SaFromClause) -> SaColumn:
if self.layer_enabled(DataLayer.MANMADE):
exclude = []
if self.layer_disabled(DataLayer.RAILWAY):
exclude.append('railway')
if self.layer_disabled(DataLayer.NATURAL):
exclude.extend(('natural', 'water', 'waterway'))
return table.c.class_.not_in(tuple(exclude))
include = []
if self.layer_enabled(DataLayer.RAILWAY):
include.append('railway')
if self.layer_enabled(DataLayer.NATURAL):
include.extend(('natural', 'water', 'waterway'))
return table.c.class_.in_(tuple(include))
async def _find_closest_street_or_poi(self, wkt: WKTElement,
distance: float) -> Optional[SaRow]:
""" Look up the closest rank 26+ place in the database, which
is closer than the given distance.
"""
t = self.conn.t.placex
sql = _select_from_placex(t, wkt)\
.where(t.c.geometry.ST_DWithin(wkt, distance))\
.where(t.c.indexed_status == 0)\
.where(t.c.linked_place_id == None)\
.where(sa.or_(t.c.geometry.ST_GeometryType()
.not_in(('ST_Polygon', 'ST_MultiPolygon')),
t.c.centroid.ST_Distance(wkt) < distance))\
.order_by('distance')\
.limit(1)
sql = self._add_geometry_columns(sql, t.c.geometry)
restrict: List[SaColumn] = []
if self.layer_enabled(DataLayer.ADDRESS):
restrict.append(sa.and_(t.c.rank_address >= 26,
t.c.rank_address <= min(29, self.max_rank)))
if self.max_rank == 30:
restrict.append(_is_address_point(t))
if self.layer_enabled(DataLayer.POI) and self.max_rank == 30:
restrict.append(sa.and_(t.c.rank_search == 30,
t.c.class_.not_in(('place', 'building')),
t.c.geometry.ST_GeometryType() != 'ST_LineString'))
if self.has_feature_layers():
restrict.append(sa.and_(t.c.rank_search.between(26, self.max_rank),
t.c.rank_address == 0,
self._filter_by_layer(t)))
if not restrict:
return None
return (await self.conn.execute(sql.where(sa.or_(*restrict)))).one_or_none()
async def _find_housenumber_for_street(self, parent_place_id: int,
wkt: WKTElement) -> Optional[SaRow]:
t = self.conn.t.placex
sql = _select_from_placex(t, wkt)\
.where(t.c.geometry.ST_DWithin(wkt, 0.001))\
.where(t.c.parent_place_id == parent_place_id)\
.where(_is_address_point(t))\
.where(t.c.indexed_status == 0)\
.where(t.c.linked_place_id == None)\
.order_by('distance')\
.limit(1)
sql = self._add_geometry_columns(sql, t.c.geometry)
return (await self.conn.execute(sql)).one_or_none()
async def _find_interpolation_for_street(self, parent_place_id: Optional[int],
wkt: WKTElement,
distance: float) -> Optional[SaRow]:
t = self.conn.t.osmline
sql = sa.select(t,
t.c.linegeo.ST_Distance(wkt).label('distance'),
_locate_interpolation(t, wkt))\
.where(t.c.linegeo.ST_DWithin(wkt, distance))\
.where(t.c.startnumber != None)\
.order_by('distance')\
.limit(1)
if parent_place_id is not None:
sql = sql.where(t.c.parent_place_id == parent_place_id)
inner = sql.subquery()
sql = sa.select(inner.c.place_id, inner.c.osm_id,
inner.c.parent_place_id, inner.c.address,
_interpolated_housenumber(inner),
_interpolated_position(inner),
inner.c.postcode, inner.c.country_code,
inner.c.distance)
if self.details.geometry_output:
sub = sql.subquery()
sql = self._add_geometry_columns(sql, sub.c.centroid)
return (await self.conn.execute(sql)).one_or_none()
async def _find_tiger_number_for_street(self, parent_place_id: int,
parent_type: str, parent_id: int,
wkt: WKTElement) -> Optional[SaRow]:
t = self.conn.t.tiger
inner = sa.select(t,
t.c.linegeo.ST_Distance(wkt).label('distance'),
_locate_interpolation(t, wkt))\
.where(t.c.linegeo.ST_DWithin(wkt, 0.001))\
.where(t.c.parent_place_id == parent_place_id)\
.order_by('distance')\
.limit(1)\
.subquery()
sql = sa.select(inner.c.place_id,
inner.c.parent_place_id,
sa.literal(parent_type).label('osm_type'),
sa.literal(parent_id).label('osm_id'),
_interpolated_housenumber(inner),
_interpolated_position(inner),
inner.c.postcode,
inner.c.distance)
if self.details.geometry_output:
sub = sql.subquery()
sql = self._add_geometry_columns(sql, sub.c.centroid)
return (await self.conn.execute(sql)).one_or_none()
async def lookup_street_poi(self,
wkt: WKTElement) -> Tuple[Optional[SaRow], RowFunc]:
""" Find a street or POI/address for the given WKT point.
"""
log().section('Reverse lookup on street/address level')
distance = 0.006
parent_place_id = None
row = await self._find_closest_street_or_poi(wkt, distance)
row_func: RowFunc = nres.create_from_placex_row
log().var_dump('Result (street/building)', row)
# If the closest result was a street, but an address was requested,
# check for a housenumber nearby which is part of the street.
if row is not None:
if self.max_rank > 27 \
and self.layer_enabled(DataLayer.ADDRESS) \
and row.rank_address <= 27:
distance = 0.001
parent_place_id = row.place_id
log().comment('Find housenumber for street')
addr_row = await self._find_housenumber_for_street(parent_place_id, wkt)
log().var_dump('Result (street housenumber)', addr_row)
if addr_row is not None:
row = addr_row
row_func = nres.create_from_placex_row
distance = addr_row.distance
elif row.country_code == 'us' and parent_place_id is not None:
log().comment('Find TIGER housenumber for street')
addr_row = await self._find_tiger_number_for_street(parent_place_id,
row.osm_type,
row.osm_id,
wkt)
log().var_dump('Result (street Tiger housenumber)', addr_row)
if addr_row is not None:
row = addr_row
row_func = nres.create_from_tiger_row
else:
distance = row.distance
# Check for an interpolation that is either closer than our result
# or belongs to a close street found.
if self.max_rank > 27 and self.layer_enabled(DataLayer.ADDRESS):
log().comment('Find interpolation for street')
addr_row = await self._find_interpolation_for_street(parent_place_id,
wkt, distance)
log().var_dump('Result (street interpolation)', addr_row)
if addr_row is not None:
row = addr_row
row_func = nres.create_from_osmline_row
return row, row_func
async def _lookup_area_address(self, wkt: WKTElement) -> Optional[SaRow]:
""" Lookup large addressable areas for the given WKT point.
"""
log().comment('Reverse lookup by larger address area features')
t = self.conn.t.placex
# The inner SQL brings results in the right order, so that
# later only a minimum of results needs to be checked with ST_Contains.
inner = sa.select(t, sa.literal(0.0).label('distance'))\
.where(t.c.rank_search.between(5, self.max_rank))\
.where(t.c.rank_address.between(5, 25))\
.where(t.c.geometry.ST_GeometryType().in_(('ST_Polygon', 'ST_MultiPolygon')))\
.where(t.c.geometry.intersects(wkt))\
.where(t.c.name != None)\
.where(t.c.indexed_status == 0)\
.where(t.c.linked_place_id == None)\
.where(t.c.type != 'postcode')\
.order_by(sa.desc(t.c.rank_search))\
.limit(50)\
.subquery()
sql = _select_from_placex(inner)\
.where(inner.c.geometry.ST_Contains(wkt))\
.order_by(sa.desc(inner.c.rank_search))\
.limit(1)
sql = self._add_geometry_columns(sql, inner.c.geometry)
address_row = (await self.conn.execute(sql)).one_or_none()
log().var_dump('Result (area)', address_row)
if address_row is not None and address_row.rank_search < self.max_rank:
log().comment('Search for better matching place nodes inside the area')
inner = sa.select(t,
t.c.geometry.ST_Distance(wkt).label('distance'))\
.where(t.c.osm_type == 'N')\
.where(t.c.rank_search > address_row.rank_search)\
.where(t.c.rank_search <= self.max_rank)\
.where(t.c.rank_address.between(5, 25))\
.where(t.c.name != None)\
.where(t.c.indexed_status == 0)\
.where(t.c.linked_place_id == None)\
.where(t.c.type != 'postcode')\
.where(t.c.geometry
.ST_Buffer(sa.func.reverse_place_diameter(t.c.rank_search))
.intersects(wkt))\
.order_by(sa.desc(t.c.rank_search))\
.limit(50)\
.subquery()
touter = self.conn.t.placex.alias('outer')
sql = _select_from_placex(inner)\
.join(touter, touter.c.geometry.ST_Contains(inner.c.geometry))\
.where(touter.c.place_id == address_row.place_id)\
.where(inner.c.distance < sa.func.reverse_place_diameter(inner.c.rank_search))\
.order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
.limit(1)
sql = self._add_geometry_columns(sql, inner.c.geometry)
place_address_row = (await self.conn.execute(sql)).one_or_none()
log().var_dump('Result (place node)', place_address_row)
if place_address_row is not None:
return place_address_row
return address_row
async def _lookup_area_others(self, wkt: WKTElement) -> Optional[SaRow]:
t = self.conn.t.placex
inner = sa.select(t, t.c.geometry.ST_Distance(wkt).label('distance'))\
.where(t.c.rank_address == 0)\
.where(t.c.rank_search.between(5, self.max_rank))\
.where(t.c.name != None)\
.where(t.c.indexed_status == 0)\
.where(t.c.linked_place_id == None)\
.where(self._filter_by_layer(t))\
.where(t.c.geometry
.ST_Buffer(sa.func.reverse_place_diameter(t.c.rank_search))
.intersects(wkt))\
.order_by(sa.desc(t.c.rank_search))\
.limit(50)\
.subquery()
sql = _select_from_placex(inner)\
.where(sa.or_(inner.c.geometry.ST_GeometryType()
.not_in(('ST_Polygon', 'ST_MultiPolygon')),
inner.c.geometry.ST_Contains(wkt)))\
.order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
.limit(1)
sql = self._add_geometry_columns(sql, inner.c.geometry)
row = (await self.conn.execute(sql)).one_or_none()
log().var_dump('Result (non-address feature)', row)
return row
async def lookup_area(self, wkt: WKTElement) -> Optional[SaRow]:
""" Lookup large areas for the given WKT point.
"""
log().section('Reverse lookup by larger area features')
if self.layer_enabled(DataLayer.ADDRESS):
address_row = await self._lookup_area_address(wkt)
else:
address_row = None
if self.has_feature_layers():
other_row = await self._lookup_area_others(wkt)
else:
other_row = None
return _get_closest(address_row, other_row)
async def lookup_country(self, wkt: WKTElement) -> Optional[SaRow]:
""" Lookup the country for the given WKT point.
"""
log().section('Reverse lookup by country code')
t = self.conn.t.country_grid
sql = sa.select(t.c.country_code).distinct()\
.where(t.c.geometry.ST_Contains(wkt))
ccodes = tuple((r[0] for r in await self.conn.execute(sql)))
log().var_dump('Country codes', ccodes)
if not ccodes:
return None
t = self.conn.t.placex
if self.max_rank > 4:
log().comment('Search for place nodes in country')
inner = sa.select(t,
t.c.geometry.ST_Distance(wkt).label('distance'))\
.where(t.c.osm_type == 'N')\
.where(t.c.rank_search > 4)\
.where(t.c.rank_search <= self.max_rank)\
.where(t.c.rank_address.between(5, 25))\
.where(t.c.name != None)\
.where(t.c.indexed_status == 0)\
.where(t.c.linked_place_id == None)\
.where(t.c.type != 'postcode')\
.where(t.c.country_code.in_(ccodes))\
.where(t.c.geometry
.ST_Buffer(sa.func.reverse_place_diameter(t.c.rank_search))
.intersects(wkt))\
.order_by(sa.desc(t.c.rank_search))\
.limit(50)\
.subquery()
sql = _select_from_placex(inner)\
.where(inner.c.distance < sa.func.reverse_place_diameter(inner.c.rank_search))\
.order_by(sa.desc(inner.c.rank_search), inner.c.distance)\
.limit(1)
sql = self._add_geometry_columns(sql, inner.c.geometry)
address_row = (await self.conn.execute(sql)).one_or_none()
log().var_dump('Result (addressable place node)', address_row)
else:
address_row = None
if address_row is None:
# Still nothing, then return a country with the appropriate country code.
sql = _select_from_placex(t, wkt)\
.where(t.c.country_code.in_(ccodes))\
.where(t.c.rank_address == 4)\
.where(t.c.rank_search == 4)\
.where(t.c.linked_place_id == None)\
.order_by('distance')\
.limit(1)
sql = self._add_geometry_columns(sql, t.c.geometry)
address_row = (await self.conn.execute(sql)).one_or_none()
return address_row
async def lookup(self, coord: AnyPoint) -> Optional[nres.ReverseResult]:
""" Look up a single coordinate. Returns the place information,
if a place was found near the coordinates or None otherwise.
"""
log().function('reverse_lookup',
coord=coord, max_rank=self.max_rank,
layer=self.layer, details=self.details)
wkt = WKTElement(f'POINT({coord[0]} {coord[1]})', srid=4326)
row: Optional[SaRow] = None
row_func: RowFunc = nres.create_from_placex_row
if self.max_rank >= 26:
row, tmp_row_func = await self.lookup_street_poi(wkt)
if row is not None:
row_func = tmp_row_func
if row is None and self.max_rank > 4:
row = await self.lookup_area(wkt)
if row is None and self.layer_enabled(DataLayer.ADDRESS):
row = await self.lookup_country(wkt)
result = row_func(row, nres.ReverseResult)
if result is not None:
assert row is not None
result.distance = row.distance
if hasattr(row, 'bbox'):
result.bbox = Bbox.from_wkb(row.bbox.data)
await nres.add_result_details(self.conn, result, self.details)
return result

View File

@ -7,7 +7,7 @@
"""
Complex datatypes used by the Nominatim API.
"""
from typing import Optional, Union, NamedTuple
from typing import Optional, Union, Tuple, NamedTuple
import dataclasses
import enum
from struct import unpack
@ -83,6 +83,77 @@ class Point(NamedTuple):
return Point(x, y)
AnyPoint = Union[Point, Tuple[float, float]]
WKB_BBOX_HEADER_LE = b'\x01\x03\x00\x00\x20\xE6\x10\x00\x00\x01\x00\x00\x00\x05\x00\x00\x00'
WKB_BBOX_HEADER_BE = b'\x00\x20\x00\x00\x03\x00\x00\x10\xe6\x00\x00\x00\x01\x00\x00\x00\x05'
class Bbox:
""" A bounding box in WSG84 projection.
The coordinates are available as an array in the 'coord'
property in the order (minx, miny, maxx, maxy).
"""
def __init__(self, minx: float, miny: float, maxx: float, maxy: float) -> None:
self.coords = (minx, miny, maxx, maxy)
@property
def minlat(self) -> float:
""" Southern-most latitude, corresponding to the minimum y coordinate.
"""
return self.coords[1]
@property
def maxlat(self) -> float:
""" Northern-most latitude, corresponding to the maximum y coordinate.
"""
return self.coords[3]
@property
def minlon(self) -> float:
""" Western-most longitude, corresponding to the minimum x coordinate.
"""
return self.coords[0]
@property
def maxlon(self) -> float:
""" Eastern-most longitude, corresponding to the maximum x coordinate.
"""
return self.coords[2]
@staticmethod
def from_wkb(wkb: Optional[bytes]) -> 'Optional[Bbox]':
""" Create a Bbox from a bounding box polygon as returned by
the database. Return s None if the input value is None.
"""
if wkb is None:
return None
if len(wkb) != 97:
raise ValueError("WKB must be a bounding box polygon")
if wkb.startswith(WKB_BBOX_HEADER_LE):
x1, y1, _, _, x2, y2 = unpack('<dddddd', wkb[17:65])
elif wkb.startswith(WKB_BBOX_HEADER_BE):
x1, y1, _, _, x2, y2 = unpack('>dddddd', wkb[17:65])
else:
raise ValueError("WKB has wrong header")
return Bbox(min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))
@staticmethod
def from_point(pt: Point, buffer: float) -> 'Bbox':
""" Return a Bbox around the point with the buffer added to all sides.
"""
return Bbox(pt[0] - buffer, pt[1] - buffer,
pt[0] + buffer, pt[1] + buffer)
class GeometryFormat(enum.Flag):
""" Geometry output formats supported by Nominatim.
"""
@ -117,3 +188,18 @@ class LookupDetails:
keywords: bool = False
""" Add information about the search terms used for this place.
"""
geometry_simplification: float = 0.0
""" Simplification factor for a geometry in degrees WGS. A factor of
0.0 means the original geometry is kept. The higher the value, the
more the geometry gets simplified.
"""
class DataLayer(enum.Flag):
""" Layer types that can be selected for reverse and forward search.
"""
POI = enum.auto()
ADDRESS = enum.auto()
RAILWAY = enum.auto()
MANMADE = enum.auto()
NATURAL = enum.auto()

View File

@ -10,6 +10,80 @@ Hard-coded information about tag catagories.
These tables have been copied verbatim from the old PHP code. For future
version a more flexible formatting is required.
"""
from typing import Tuple, Optional, Mapping
import nominatim.api as napi
def get_label_tag(category: Tuple[str, str], extratags: Optional[Mapping[str, str]],
rank: int, country: Optional[str]) -> str:
""" Create a label tag for the given place that can be used as an XML name.
"""
if rank < 26 and extratags and 'place' in extratags:
label = extratags['place']
elif rank < 26 and extratags and 'linked_place' in extratags:
label = extratags['linked_place']
elif category == ('boundary', 'administrative'):
label = ADMIN_LABELS.get((country or '', int(rank/2)))\
or ADMIN_LABELS.get(('', int(rank/2)))\
or 'Administrative'
elif category[1] == 'postal_code':
label = 'postcode'
elif rank < 26:
label = category[1] if category[1] != 'yes' else category[0]
elif rank < 28:
label = 'road'
elif category[0] == 'place'\
and category[1] in ('house_number', 'house_name', 'country_code'):
label = category[1]
else:
label = category[0]
return label.lower().replace(' ', '_')
def bbox_from_result(result: napi.ReverseResult) -> napi.Bbox:
""" Compute a bounding box for the result. For ways and relations
a given boundingbox is used. For all other object, a box is computed
around the centroid according to dimensions dereived from the
search rank.
"""
if (result.osm_object and result.osm_object[0] == 'N') or result.bbox is None:
extent = NODE_EXTENT.get(result.category, 0.00005)
return napi.Bbox.from_point(result.centroid, extent)
return result.bbox
# pylint: disable=line-too-long
OSM_ATTRIBUTION = 'Data © OpenStreetMap contributors, ODbL 1.0. http://osm.org/copyright'
OSM_TYPE_NAME = {
'N': 'node',
'W': 'way',
'R': 'relation'
}
ADMIN_LABELS = {
('', 1): 'Continent',
('', 2): 'Country',
('', 3): 'Region',
('', 4): 'State',
('', 5): 'State District',
('', 6): 'County',
('', 7): 'Municipality',
('', 8): 'City',
('', 9): 'City District',
('', 10): 'Suburb',
('', 11): 'Neighbourhood',
('', 12): 'City Block',
('no', 3): 'State',
('no', 4): 'County',
('se', 3): 'State',
('se', 4): 'County'
}
ICONS = {
('boundary', 'administrative'): 'poi_boundary_administrative',
@ -96,3 +170,31 @@ ICONS = {
('amenity', 'prison'): 'amenity_prison',
('highway', 'bus_stop'): 'transport_bus_stop2'
}
NODE_EXTENT = {
('place', 'continent'): 25,
('place', 'country'): 7,
('place', 'state'): 2.6,
('place', 'province'): 2.6,
('place', 'region'): 1.0,
('place', 'county'): 0.7,
('place', 'city'): 0.16,
('place', 'municipality'): 0.16,
('place', 'island'): 0.32,
('place', 'postcode'): 0.16,
('place', 'town'): 0.04,
('place', 'village'): 0.02,
('place', 'hamlet'): 0.02,
('place', 'district'): 0.02,
('place', 'borough'): 0.02,
('place', 'suburb'): 0.02,
('place', 'locality'): 0.01,
('place', 'neighbourhood'): 0.01,
('place', 'quarter'): 0.01,
('place', 'city_block'): 0.01,
('landuse', 'farm'): 0.01,
('place', 'farm'): 0.01,
('place', 'airport'): 0.015,
('aeroway', 'aerodrome'): 0.015,
('railway', 'station'): 0.005
}

View File

@ -13,6 +13,7 @@ import collections
import nominatim.api as napi
from nominatim.api.result_formatting import FormatDispatcher
from nominatim.api.v1.classtypes import ICONS
from nominatim.api.v1 import format_json, format_xml
from nominatim.utils.json_writer import JsonWriter
dispatch = FormatDispatcher()
@ -92,8 +93,8 @@ def _add_parent_rows_grouped(writer: JsonWriter, rows: napi.AddressLines,
writer.end_object().next()
@dispatch.format_func(napi.SearchResult, 'details-json')
def _format_search_json(result: napi.SearchResult, options: Mapping[str, Any]) -> str:
@dispatch.format_func(napi.DetailedResult, 'json')
def _format_details_json(result: napi.DetailedResult, options: Mapping[str, Any]) -> str:
locales = options.get('locales', napi.Locales())
geom = result.geometry.get('geojson')
centroid = result.centroid.to_geojson()
@ -161,3 +162,36 @@ def _format_search_json(result: napi.SearchResult, options: Mapping[str, Any]) -
out.end_object()
return out()
@dispatch.format_func(napi.ReverseResults, 'xml')
def _format_reverse_xml(results: napi.ReverseResults, options: Mapping[str, Any]) -> str:
return format_xml.format_base_xml(results,
options, True, 'reversegeocode',
{'querystring': 'TODO'})
@dispatch.format_func(napi.ReverseResults, 'geojson')
def _format_reverse_geojson(results: napi.ReverseResults,
options: Mapping[str, Any]) -> str:
return format_json.format_base_geojson(results, options, True)
@dispatch.format_func(napi.ReverseResults, 'geocodejson')
def _format_reverse_geocodejson(results: napi.ReverseResults,
options: Mapping[str, Any]) -> str:
return format_json.format_base_geocodejson(results, options, True)
@dispatch.format_func(napi.ReverseResults, 'json')
def _format_reverse_json(results: napi.ReverseResults,
options: Mapping[str, Any]) -> str:
return format_json.format_base_json(results, options, True,
class_label='class')
@dispatch.format_func(napi.ReverseResults, 'jsonv2')
def _format_reverse_jsonv2(results: napi.ReverseResults,
options: Mapping[str, Any]) -> str:
return format_json.format_base_json(results, options, True,
class_label='category')

View File

@ -0,0 +1,287 @@
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2023 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Helper functions for output of results in json formats.
"""
from typing import Mapping, Any, Optional, Tuple
import nominatim.api as napi
import nominatim.api.v1.classtypes as cl
from nominatim.utils.json_writer import JsonWriter
def _write_osm_id(out: JsonWriter, osm_object: Optional[Tuple[str, int]]) -> None:
if osm_object is not None:
out.keyval_not_none('osm_type', cl.OSM_TYPE_NAME.get(osm_object[0], None))\
.keyval('osm_id', osm_object[1])
def _write_typed_address(out: JsonWriter, address: Optional[napi.AddressLines],
country_code: Optional[str]) -> None:
parts = {}
for line in (address or []):
if line.isaddress:
if line.local_name:
label = cl.get_label_tag(line.category, line.extratags,
line.rank_address, country_code)
if label not in parts:
parts[label] = line.local_name
if line.names and 'ISO3166-2' in line.names and line.admin_level:
parts[f"ISO3166-2-lvl{line.admin_level}"] = line.names['ISO3166-2']
for k, v in parts.items():
out.keyval(k, v)
if country_code:
out.keyval('country_code', country_code)
def _write_geocodejson_address(out: JsonWriter,
address: Optional[napi.AddressLines],
obj_place_id: Optional[int],
country_code: Optional[str]) -> None:
extra = {}
for line in (address or []):
if line.isaddress and line.local_name:
if line.category[1] in ('postcode', 'postal_code'):
out.keyval('postcode', line.local_name)
elif line.category[1] == 'house_number':
out.keyval('housenumber', line.local_name)
elif (obj_place_id is None or obj_place_id != line.place_id) \
and line.rank_address >= 4 and line.rank_address < 28:
extra[GEOCODEJSON_RANKS[line.rank_address]] = line.local_name
for k, v in extra.items():
out.keyval(k, v)
if country_code:
out.keyval('country_code', country_code)
def format_base_json(results: napi.ReverseResults, #pylint: disable=too-many-branches
options: Mapping[str, Any], simple: bool,
class_label: str) -> str:
""" Return the result list as a simple json string in custom Nominatim format.
"""
locales = options.get('locales', napi.Locales())
out = JsonWriter()
if simple:
if not results:
return '{"error":"Unable to geocode"}'
else:
out.start_array()
for result in results:
label_parts = result.address_rows.localize(locales) if result.address_rows else []
out.start_object()\
.keyval_not_none('place_id', result.place_id)\
.keyval('licence', cl.OSM_ATTRIBUTION)\
_write_osm_id(out, result.osm_object)
out.keyval('lat', result.centroid.lat)\
.keyval('lon', result.centroid.lon)\
.keyval(class_label, result.category[0])\
.keyval('type', result.category[1])\
.keyval('place_rank', result.rank_search)\
.keyval('importance', result.calculated_importance())\
.keyval('addresstype', cl.get_label_tag(result.category, result.extratags,
result.rank_address,
result.country_code))\
.keyval('name', locales.display_name(result.names))\
.keyval('display_name', ', '.join(label_parts))
if options.get('icon_base_url', None):
icon = cl.ICONS.get(result.category)
if icon:
out.keyval('icon', f"{options['icon_base_url']}/{icon}.p.20.png")
if options.get('addressdetails', False):
out.key('address').start_object()
_write_typed_address(out, result.address_rows, result.country_code)
out.end_object().next()
if options.get('extratags', False):
out.keyval('extratags', result.extratags)
if options.get('namedetails', False):
out.keyval('namedetails', result.names)
bbox = cl.bbox_from_result(result)
out.key('boundingbox').start_array()\
.value(f"{bbox.minlat:0.7f}").next()\
.value(f"{bbox.maxlat:0.7f}").next()\
.value(f"{bbox.minlon:0.7f}").next()\
.value(f"{bbox.maxlon:0.7f}").next()\
.end_array().next()
if result.geometry:
for key in ('text', 'kml'):
out.keyval_not_none('geo' + key, result.geometry.get(key))
if 'geojson' in result.geometry:
out.key('geojson').raw(result.geometry['geojson']).next()
out.keyval_not_none('svg', result.geometry.get('svg'))
out.end_object()
if simple:
return out()
out.next()
out.end_array()
return out()
def format_base_geojson(results: napi.ReverseResults,
options: Mapping[str, Any],
simple: bool) -> str:
""" Return the result list as a geojson string.
"""
if not results and simple:
return '{"error":"Unable to geocode"}'
locales = options.get('locales', napi.Locales())
out = JsonWriter()
out.start_object()\
.keyval('type', 'FeatureCollection')\
.keyval('licence', cl.OSM_ATTRIBUTION)\
.key('features').start_array()
for result in results:
if result.address_rows:
label_parts = result.address_rows.localize(locales)
else:
label_parts = []
out.start_object()\
.keyval('type', 'Feature')\
.key('properties').start_object()
out.keyval_not_none('place_id', result.place_id)
_write_osm_id(out, result.osm_object)
out.keyval('place_rank', result.rank_search)\
.keyval('category', result.category[0])\
.keyval('type', result.category[1])\
.keyval('importance', result.calculated_importance())\
.keyval('addresstype', cl.get_label_tag(result.category, result.extratags,
result.rank_address,
result.country_code))\
.keyval('name', locales.display_name(result.names))\
.keyval('display_name', ', '.join(label_parts))
if options.get('addressdetails', False):
out.key('address').start_object()
_write_typed_address(out, result.address_rows, result.country_code)
out.end_object().next()
if options.get('extratags', False):
out.keyval('extratags', result.extratags)
if options.get('namedetails', False):
out.keyval('namedetails', result.names)
out.end_object().next() # properties
out.key('bbox').start_array()
for coord in cl.bbox_from_result(result).coords:
out.float(coord, 7).next()
out.end_array().next()
out.key('geometry').raw(result.geometry.get('geojson')
or result.centroid.to_geojson()).next()
out.end_object().next()
out.end_array().next().end_object()
return out()
def format_base_geocodejson(results: napi.ReverseResults,
options: Mapping[str, Any], simple: bool) -> str:
""" Return the result list as a geocodejson string.
"""
if not results and simple:
return '{"error":"Unable to geocode"}'
locales = options.get('locales', napi.Locales())
out = JsonWriter()
out.start_object()\
.keyval('type', 'FeatureCollection')\
.key('geocoding').start_object()\
.keyval('version', '0.1.0')\
.keyval('attribution', cl.OSM_ATTRIBUTION)\
.keyval('licence', 'ODbL')\
.keyval_not_none('query', options.get('query'))\
.end_object().next()\
.key('features').start_array()
for result in results:
if result.address_rows:
label_parts = result.address_rows.localize(locales)
else:
label_parts = []
out.start_object()\
.keyval('type', 'Feature')\
.key('properties').start_object()\
.key('geocoding').start_object()
out.keyval_not_none('place_id', result.place_id)
_write_osm_id(out, result.osm_object)
out.keyval('osm_key', result.category[0])\
.keyval('osm_value', result.category[1])\
.keyval('type', GEOCODEJSON_RANKS[max(3, min(28, result.rank_address))])\
.keyval_not_none('accuracy', result.distance, transform=int)\
.keyval('label', ', '.join(label_parts))\
.keyval_not_none('name', result.names, transform=locales.display_name)\
if options.get('addressdetails', False):
_write_geocodejson_address(out, result.address_rows, result.place_id,
result.country_code)
out.key('admin').start_object()
if result.address_rows:
for line in result.address_rows:
if line.isaddress and (line.admin_level or 15) < 15 and line.local_name:
out.keyval(f"level{line.admin_level}", line.local_name)
out.end_object().next()
out.end_object().next().end_object().next()
out.key('geometry').raw(result.geometry.get('geojson')
or result.centroid.to_geojson()).next()
out.end_object().next()
out.end_array().next().end_object()
return out()
GEOCODEJSON_RANKS = {
3: 'locality',
4: 'country',
5: 'state', 6: 'state', 7: 'state', 8: 'state', 9: 'state',
10: 'county', 11: 'county', 12: 'county',
13: 'city', 14: 'city', 15: 'city', 16: 'city',
17: 'district', 18: 'district', 19: 'district', 20: 'district', 21: 'district',
22: 'locality', 23: 'locality', 24: 'locality',
25: 'street', 26: 'street', 27: 'street', 28: 'house'}

View File

@ -0,0 +1,131 @@
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2023 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Helper functions for output of results in XML format.
"""
from typing import Mapping, Any, Optional
import datetime as dt
import xml.etree.ElementTree as ET
import nominatim.api as napi
import nominatim.api.v1.classtypes as cl
def _write_xml_address(root: ET.Element, address: napi.AddressLines,
country_code: Optional[str]) -> None:
parts = {}
for line in address:
if line.isaddress:
if line.local_name:
label = cl.get_label_tag(line.category, line.extratags,
line.rank_address, country_code)
if label not in parts:
parts[label] = line.local_name
if line.names and 'ISO3166-2' in line.names and line.admin_level:
parts[f"ISO3166-2-lvl{line.admin_level}"] = line.names['ISO3166-2']
for k,v in parts.items():
ET.SubElement(root, k).text = v
if country_code:
ET.SubElement(root, 'country_code').text = country_code
def _create_base_entry(result: napi.ReverseResult, #pylint: disable=too-many-branches
root: ET.Element, simple: bool,
locales: napi.Locales) -> ET.Element:
if result.address_rows:
label_parts = result.address_rows.localize(locales)
else:
label_parts = []
place = ET.SubElement(root, 'result' if simple else 'place')
if result.place_id is not None:
place.set('place_id', str(result.place_id))
if result.osm_object:
osm_type = cl.OSM_TYPE_NAME.get(result.osm_object[0], None)
if osm_type is not None:
place.set('osm_type', osm_type)
place.set('osm_id', str(result.osm_object[1]))
if result.names and 'ref' in result.names:
place.set('ref', result.names['ref'])
elif label_parts:
# bug reproduced from PHP
place.set('ref', label_parts[0])
place.set('lat', f"{result.centroid.lat:.7f}")
place.set('lon', f"{result.centroid.lon:.7f}")
bbox = cl.bbox_from_result(result)
place.set('boundingbox',
f"{bbox.minlat:.7f},{bbox.maxlat:.7f},{bbox.minlon:.7f},{bbox.maxlon:.7f}")
place.set('place_rank', str(result.rank_search))
place.set('address_rank', str(result.rank_address))
if result.geometry:
for key in ('text', 'svg'):
if key in result.geometry:
place.set('geo' + key, result.geometry[key])
if 'kml' in result.geometry:
ET.SubElement(root if simple else place, 'geokml')\
.append(ET.fromstring(result.geometry['kml']))
if 'geojson' in result.geometry:
place.set('geojson', result.geometry['geojson'])
if simple:
place.text = ', '.join(label_parts)
else:
place.set('display_name', ', '.join(label_parts))
place.set('class', result.category[0])
place.set('type', result.category[1])
place.set('importance', str(result.calculated_importance()))
return place
def format_base_xml(results: napi.ReverseResults,
options: Mapping[str, Any],
simple: bool, xml_root_tag: str,
xml_extra_info: Mapping[str, str]) -> str:
""" Format the result into an XML response. With 'simple' exactly one
result will be output, otherwise a list.
"""
locales = options.get('locales', napi.Locales())
root = ET.Element(xml_root_tag)
root.set('timestamp', dt.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S +00:00'))
root.set('attribution', cl.OSM_ATTRIBUTION)
for k, v in xml_extra_info.items():
root.set(k, v)
if simple and not results:
ET.SubElement(root, 'error').text = 'Unable to geocode'
for result in results:
place = _create_base_entry(result, root, simple, locales)
if not simple and options.get('icon_base_url', None):
icon = cl.ICONS.get(result.category)
if icon:
place.set('icon', icon)
if options.get('addressdetails', False) and result.address_rows:
_write_xml_address(ET.SubElement(root, 'addressparts') if simple else place,
result.address_rows, result.country_code)
if options.get('extratags', False):
eroot = ET.SubElement(root if simple else place, 'extratags')
if result.extratags:
for k, v in result.extratags.items():
ET.SubElement(eroot, 'tag', attrib={'key': k, 'value': v})
if options.get('namedetails', False):
eroot = ET.SubElement(root if simple else place, 'namedetails')
if result.names:
for k,v in result.names.items():
ET.SubElement(eroot, 'name', attrib={'desc': k}).text = v
return '<?xml version="1.0" encoding="UTF-8" ?>\n' + ET.tostring(root, encoding='unicode')

View File

@ -8,8 +8,10 @@
Generic part of the server implementation of the v1 API.
Combine with the scaffolding provided for the various Python ASGI frameworks.
"""
from typing import Optional, Any, Type, Callable
from typing import Optional, Any, Type, Callable, NoReturn, cast
from functools import reduce
import abc
import math
from nominatim.config import Configuration
import nominatim.api as napi
@ -19,15 +21,14 @@ from nominatim.api.v1.format import dispatch as formatting
CONTENT_TYPE = {
'text': 'text/plain; charset=utf-8',
'xml': 'text/xml; charset=utf-8',
'jsonp': 'application/javascript',
'debug': 'text/html; charset=utf-8'
}
class ASGIAdaptor(abc.ABC):
""" Adapter class for the different ASGI frameworks.
Wraps functionality over concrete requests and responses.
"""
content_type: str = 'text/plain; charset=utf-8'
@abc.abstractmethod
def get(self, name: str, default: Optional[str] = None) -> Optional[str]:
@ -50,7 +51,7 @@ class ASGIAdaptor(abc.ABC):
@abc.abstractmethod
def create_response(self, status: int, output: str, content_type: str) -> Any:
def create_response(self, status: int, output: str) -> Any:
""" Create a response from the given parameters. The result will
be returned by the endpoint functions. The adaptor may also
return None when the response is created internally with some
@ -68,20 +69,42 @@ class ASGIAdaptor(abc.ABC):
"""
def build_response(self, output: str, media_type: str, status: int = 200) -> Any:
def build_response(self, output: str, status: int = 200) -> Any:
""" Create a response from the given output. Wraps a JSONP function
around the response, if necessary.
"""
if media_type == 'json' and status == 200:
if self.content_type == 'application/json' and status == 200:
jsonp = self.get('json_callback')
if jsonp is not None:
if any(not part.isidentifier() for part in jsonp.split('.')):
raise self.error('Invalid json_callback value')
self.raise_error('Invalid json_callback value')
output = f"{jsonp}({output})"
media_type = 'jsonp'
self.content_type = 'application/javascript'
return self.create_response(status, output,
CONTENT_TYPE.get(media_type, 'application/json'))
return self.create_response(status, output)
def raise_error(self, msg: str, status: int = 400) -> NoReturn:
""" Raise an exception resulting in the given HTTP status and
message. The message will be formatted according to the
output format chosen by the request.
"""
if self.content_type == 'text/xml; charset=utf-8':
msg = f"""<?xml version="1.0" encoding="UTF-8" ?>
<error>
<code>{status}</code>
<message>{msg}</message>
</error>
"""
elif self.content_type == 'application/json':
msg = f"""{{"error":{{"code":{status},"message":"{msg}"}}}}"""
elif self.content_type == 'text/html; charset=utf-8':
loglib.log().section('Execution error')
loglib.log().var_dump('Status', status)
loglib.log().var_dump('Message', msg)
msg = loglib.get_and_disable()
raise self.error(msg, status)
def get_int(self, name: str, default: Optional[int] = None) -> int:
@ -98,12 +121,41 @@ class ASGIAdaptor(abc.ABC):
if default is not None:
return default
raise self.error(f"Parameter '{name}' missing.")
self.raise_error(f"Parameter '{name}' missing.")
try:
return int(value)
except ValueError as exc:
raise self.error(f"Parameter '{name}' must be a number.") from exc
intval = int(value)
except ValueError:
self.raise_error(f"Parameter '{name}' must be a number.")
return intval
def get_float(self, name: str, default: Optional[float] = None) -> float:
""" Return an input parameter as a flaoting-point number. Raises an
exception if the parameter is given but not in an float format.
If 'default' is given, then it will be returned when the parameter
is missing completely. When 'default' is None, an error will be
raised on a missing parameter.
"""
value = self.get(name)
if value is None:
if default is not None:
return default
self.raise_error(f"Parameter '{name}' missing.")
try:
fval = float(value)
except ValueError:
self.raise_error(f"Parameter '{name}' must be a number.")
if math.isnan(fval) or math.isinf(fval):
self.raise_error(f"Parameter '{name}' must be a number.")
return fval
def get_bool(self, name: str, default: Optional[bool] = None) -> bool:
@ -120,13 +172,13 @@ class ASGIAdaptor(abc.ABC):
if default is not None:
return default
raise self.error(f"Parameter '{name}' missing.")
self.raise_error(f"Parameter '{name}' missing.")
return value != '0'
def get_accepted_languages(self) -> str:
""" Return the accepted langauges.
""" Return the accepted languages.
"""
return self.get('accept-language')\
or self.get_header('http_accept_language')\
@ -140,24 +192,38 @@ class ASGIAdaptor(abc.ABC):
"""
if self.get_bool('debug', False):
loglib.set_log_output('html')
self.content_type = 'text/html; charset=utf-8'
return True
return False
def parse_format(params: ASGIAdaptor, result_type: Type[Any], default: str) -> str:
""" Get and check the 'format' parameter and prepare the formatter.
`fmtter` is a formatter and `default` the
format value to assume when no parameter is present.
"""
fmt = params.get('format', default=default)
assert fmt is not None
def get_layers(self) -> Optional[napi.DataLayer]:
""" Return a parsed version of the layer parameter.
"""
param = self.get('layer', None)
if param is None:
return None
if not formatting.supports_format(result_type, fmt):
raise params.error("Parameter 'format' must be one of: " +
', '.join(formatting.list_formats(result_type)))
return cast(napi.DataLayer,
reduce(napi.DataLayer.__or__,
(getattr(napi.DataLayer, s.upper()) for s in param.split(','))))
return fmt
def parse_format(self, result_type: Type[Any], default: str) -> str:
""" Get and check the 'format' parameter and prepare the formatter.
`result_type` is the type of result to be returned by the function
and `default` the format value to assume when no parameter is present.
"""
fmt = self.get('format', default=default)
assert fmt is not None
if not formatting.supports_format(result_type, fmt):
self.raise_error("Parameter 'format' must be one of: " +
', '.join(formatting.list_formats(result_type)))
self.content_type = CONTENT_TYPE.get(fmt, 'application/json')
return fmt
async def status_endpoint(api: napi.NominatimAPIAsync, params: ASGIAdaptor) -> Any:
@ -165,20 +231,21 @@ async def status_endpoint(api: napi.NominatimAPIAsync, params: ASGIAdaptor) -> A
"""
result = await api.status()
fmt = parse_format(params, napi.StatusResult, 'text')
fmt = params.parse_format(napi.StatusResult, 'text')
if fmt == 'text' and result.status:
status_code = 500
else:
status_code = 200
return params.build_response(formatting.format_result(result, fmt, {}), fmt,
return params.build_response(formatting.format_result(result, fmt, {}),
status=status_code)
async def details_endpoint(api: napi.NominatimAPIAsync, params: ASGIAdaptor) -> Any:
""" Server glue for /details endpoint. See API docs for details.
"""
fmt = params.parse_format(napi.DetailedResult, 'json')
place_id = params.get_int('place_id', 0)
place: napi.PlaceRef
if place_id:
@ -186,7 +253,7 @@ async def details_endpoint(api: napi.NominatimAPIAsync, params: ASGIAdaptor) ->
else:
osmtype = params.get('osmtype')
if osmtype is None:
raise params.error("Missing ID parameter 'place_id' or 'osmtype'.")
params.raise_error("Missing ID parameter 'place_id' or 'osmtype'.")
place = napi.OsmID(osmtype, params.get_int('osmid'), params.get('class'))
debug = params.setup_debugging()
@ -204,24 +271,91 @@ async def details_endpoint(api: napi.NominatimAPIAsync, params: ASGIAdaptor) ->
result = await api.lookup(place, details)
if debug:
return params.build_response(loglib.get_and_disable(), 'debug')
return params.build_response(loglib.get_and_disable())
if result is None:
raise params.error('No place with that OSM ID found.', status=404)
params.raise_error('No place with that OSM ID found.', status=404)
output = formatting.format_result(
result,
'details-json',
output = formatting.format_result(result, fmt,
{'locales': locales,
'group_hierarchy': params.get_bool('group_hierarchy', False),
'icon_base_url': params.config().MAPICON_URL})
return params.build_response(output, 'json')
return params.build_response(output)
async def reverse_endpoint(api: napi.NominatimAPIAsync, params: ASGIAdaptor) -> Any:
""" Server glue for /reverse endpoint. See API docs for details.
"""
fmt = params.parse_format(napi.ReverseResults, 'xml')
debug = params.setup_debugging()
coord = napi.Point(params.get_float('lon'), params.get_float('lat'))
locales = napi.Locales.from_accept_languages(params.get_accepted_languages())
zoom = max(0, min(18, params.get_int('zoom', 18)))
details = napi.LookupDetails(address_details=True,
geometry_simplification=params.get_float('polygon_threshold', 0.0))
numgeoms = 0
if params.get_bool('polygon_geojson', False):
details.geometry_output |= napi.GeometryFormat.GEOJSON
numgeoms += 1
if fmt not in ('geojson', 'geocodejson'):
if params.get_bool('polygon_text', False):
details.geometry_output |= napi.GeometryFormat.TEXT
numgeoms += 1
if params.get_bool('polygon_kml', False):
details.geometry_output |= napi.GeometryFormat.KML
numgeoms += 1
if params.get_bool('polygon_svg', False):
details.geometry_output |= napi.GeometryFormat.SVG
numgeoms += 1
if numgeoms > params.config().get_int('POLYGON_OUTPUT_MAX_TYPES'):
params.raise_error('Too many polgyon output options selected.')
result = await api.reverse(coord, REVERSE_MAX_RANKS[zoom],
params.get_layers() or
napi.DataLayer.ADDRESS | napi.DataLayer.POI,
details)
if debug:
return params.build_response(loglib.get_and_disable())
fmt_options = {'locales': locales,
'extratags': params.get_bool('extratags', False),
'namedetails': params.get_bool('namedetails', False),
'addressdetails': params.get_bool('addressdetails', True)}
if fmt == 'xml':
fmt_options['xml_roottag'] = 'reversegeocode'
fmt_options['xml_extra_info'] = {'querystring': 'TODO'}
output = formatting.format_result(napi.ReverseResults([result] if result else []),
fmt, fmt_options)
return params.build_response(output)
EndpointFunc = Callable[[napi.NominatimAPIAsync, ASGIAdaptor], Any]
REVERSE_MAX_RANKS = [2, 2, 2, # 0-2 Continent/Sea
4, 4, # 3-4 Country
8, # 5 State
10, 10, # 6-7 Region
12, 12, # 8-9 County
16, 17, # 10-11 City
18, # 12 Town
19, # 13 Village/Suburb
22, # 14 Hamlet/Neighbourhood
25, # 15 Localities
26, # 16 Major Streets
27, # 17 Minor Streets
30 # 18 Building
]
ROUTES = [
('status', status_endpoint),
('details', details_endpoint)
('details', details_endpoint),
('reverse', reverse_endpoint)
]

View File

@ -18,6 +18,7 @@ from nominatim.errors import UsageError
from nominatim.clicmd.args import NominatimArgs
import nominatim.api as napi
import nominatim.api.v1 as api_output
from nominatim.api.v1.server_glue import REVERSE_MAX_RANKS
# Do not repeat documentation of subcommand classes.
# pylint: disable=C0111
@ -53,7 +54,8 @@ def _add_api_output_arguments(parser: argparse.ArgumentParser) -> None:
group.add_argument('--polygon-output',
choices=['geojson', 'kml', 'svg', 'text'],
help='Output geometry of results as a GeoJSON, KML, SVG or WKT')
group.add_argument('--polygon-threshold', type=float, metavar='TOLERANCE',
group.add_argument('--polygon-threshold', type=float, default = 0.0,
metavar='TOLERANCE',
help=("Simplify output geometry."
"Parameter is difference tolerance in degrees."))
@ -150,26 +152,46 @@ class APIReverse:
help='Longitude of coordinate to look up (in WGS84)')
group.add_argument('--zoom', type=int,
help='Level of detail required for the address')
group.add_argument('--layer', metavar='LAYER',
choices=[n.name.lower() for n in napi.DataLayer if n.name],
action='append', required=False, dest='layers',
help='OSM id to lookup in format <NRW><id> (may be repeated)')
_add_api_output_arguments(parser)
def run(self, args: NominatimArgs) -> int:
params = dict(lat=args.lat, lon=args.lon, format=args.format)
if args.zoom is not None:
params['zoom'] = args.zoom
api = napi.NominatimAPI(args.project_dir)
for param, _ in EXTRADATA_PARAMS:
if getattr(args, param):
params[param] = '1'
if args.lang:
params['accept-language'] = args.lang
if args.polygon_output:
params['polygon_' + args.polygon_output] = '1'
if args.polygon_threshold:
params['polygon_threshold'] = args.polygon_threshold
details = napi.LookupDetails(address_details=True, # needed for display name
geometry_output=args.get_geometry_output(),
geometry_simplification=args.polygon_threshold or 0.0)
result = api.reverse(napi.Point(args.lon, args.lat),
REVERSE_MAX_RANKS[max(0, min(18, args.zoom or 18))],
args.get_layers(napi.DataLayer.ADDRESS | napi.DataLayer.POI),
details)
if result:
output = api_output.format_result(
napi.ReverseResults([result]),
args.format,
{'locales': args.get_locales(api.config.DEFAULT_LANGUAGE),
'extratags': args.extratags,
'namedetails': args.namedetails,
'addressdetails': args.addressdetails})
if args.format != 'xml':
# reformat the result, so it is pretty-printed
json.dump(json.loads(output), sys.stdout, indent=4, ensure_ascii=False)
else:
sys.stdout.write(output)
sys.stdout.write('\n')
return 0
LOG.error("Unable to geocode.")
return 42
return _run_api('reverse', args, params)
class APILookup:
@ -270,23 +292,16 @@ class APIDetails:
if args.polygon_geojson:
details.geometry_output = napi.GeometryFormat.GEOJSON
if args.lang:
locales = napi.Locales.from_accept_languages(args.lang)
elif api.config.DEFAULT_LANGUAGE:
locales = napi.Locales.from_accept_languages(api.config.DEFAULT_LANGUAGE)
else:
locales = napi.Locales()
result = api.lookup(place, details)
if result:
output = api_output.format_result(
result,
'details-json',
{'locales': locales,
'json',
{'locales': args.get_locales(api.config.DEFAULT_LANGUAGE),
'group_hierarchy': args.group_hierarchy})
# reformat the result, so it is pretty-printed
json.dump(json.loads(output), sys.stdout, indent=4)
json.dump(json.loads(output), sys.stdout, indent=4, ensure_ascii=False)
sys.stdout.write('\n')
return 0

View File

@ -10,11 +10,13 @@ Provides custom functions over command-line arguments.
from typing import Optional, List, Dict, Any, Sequence, Tuple
import argparse
import logging
from functools import reduce
from pathlib import Path
from nominatim.errors import UsageError
from nominatim.config import Configuration
from nominatim.typing import Protocol
import nominatim.api as napi
LOG = logging.getLogger()
@ -162,6 +164,7 @@ class NominatimArgs:
lat: float
lon: float
zoom: Optional[int]
layers: Optional[Sequence[str]]
# Arguments to 'lookup'
ids: Sequence[str]
@ -211,3 +214,45 @@ class NominatimArgs:
raise UsageError('Cannot access file.')
return files
def get_geometry_output(self) -> napi.GeometryFormat:
""" Get the requested geometry output format in a API-compatible
format.
"""
if not self.polygon_output:
return napi.GeometryFormat.NONE
if self.polygon_output == 'geojson':
return napi.GeometryFormat.GEOJSON
if self.polygon_output == 'kml':
return napi.GeometryFormat.KML
if self.polygon_output == 'svg':
return napi.GeometryFormat.SVG
if self.polygon_output == 'text':
return napi.GeometryFormat.TEXT
try:
return napi.GeometryFormat[self.polygon_output.upper()]
except KeyError as exp:
raise UsageError(f"Unknown polygon output format '{self.polygon_output}'.") from exp
def get_locales(self, default: Optional[str]) -> napi.Locales:
""" Get the locales from the language parameter.
"""
if self.lang:
return napi.Locales.from_accept_languages(self.lang)
if default:
return napi.Locales.from_accept_languages(default)
return napi.Locales()
def get_layers(self, default: napi.DataLayer) -> Optional[napi.DataLayer]:
""" Get the list of selected layers as a DataLayer enum.
"""
if not self.layers:
return default
return reduce(napi.DataLayer.__or__,
(napi.DataLayer[s.upper()] for s in self.layers))

View File

@ -7,16 +7,34 @@
"""
Server implementation using the falcon webserver framework.
"""
from typing import Optional, Mapping, cast
from typing import Optional, Mapping, cast, Any
from pathlib import Path
import falcon
from falcon.asgi import App, Request, Response
from nominatim.api import NominatimAPIAsync
import nominatim.api.v1 as api_impl
from nominatim.config import Configuration
class HTTPNominatimError(Exception):
""" A special exception class for errors raised during processing.
"""
def __init__(self, msg: str, status: int, content_type: str) -> None:
self.msg = msg
self.status = status
self.content_type = content_type
async def nominatim_error_handler(req: Request, resp: Response, #pylint: disable=unused-argument
exception: HTTPNominatimError,
_: Any) -> None:
""" Special error handler that passes message and content type as
per exception info.
"""
resp.status = exception.status
resp.text = exception.msg
resp.content_type = exception.content_type
class ParamWrapper(api_impl.ASGIAdaptor):
""" Adaptor class for server glue to Falcon framework.
@ -37,19 +55,14 @@ class ParamWrapper(api_impl.ASGIAdaptor):
return cast(Optional[str], self.request.get_header(name, default=default))
def error(self, msg: str, status: int = 400) -> falcon.HTTPError:
if status == 400:
return falcon.HTTPBadRequest(description=msg)
if status == 404:
return falcon.HTTPNotFound(description=msg)
return falcon.HTTPError(status, description=msg)
def error(self, msg: str, status: int = 400) -> HTTPNominatimError:
return HTTPNominatimError(msg, status, self.content_type)
def create_response(self, status: int, output: str, content_type: str) -> None:
def create_response(self, status: int, output: str) -> None:
self.response.status = status
self.response.text = output
self.response.content_type = content_type
self.response.content_type = self.content_type
def config(self) -> Configuration:
@ -78,6 +91,7 @@ def get_application(project_dir: Path,
api = NominatimAPIAsync(project_dir, environ)
app = App(cors_enable=api.config.get_bool('CORS_NOACCESSCONTROL'))
app.add_error_handler(HTTPNominatimError, nominatim_error_handler)
legacy_urls = api.config.get_bool('SERVE_LEGACY_URLS')
for name, func in api_impl.ROUTES:
@ -87,3 +101,11 @@ def get_application(project_dir: Path,
app.add_route(f"/{name}.php", endpoint)
return app
def run_wsgi() -> App:
""" Entry point for uvicorn.
Make sure uvicorn is run from the project directory.
"""
return get_application(Path('.'))

View File

@ -35,12 +35,13 @@ class ParamWrapper(api_impl.ASGIAdaptor):
def error(self, msg: str, status: int = 400) -> SanicException:
return SanicException(msg, status_code=status)
exception = SanicException(msg, status_code=status)
return exception
def create_response(self, status: int, output: str,
content_type: str) -> HTTPResponse:
return TextResponse(output, status=status, content_type=content_type)
def create_response(self, status: int, output: str) -> HTTPResponse:
return TextResponse(output, status=status, content_type=self.content_type)
def config(self) -> Configuration:

View File

@ -39,11 +39,12 @@ class ParamWrapper(api_impl.ASGIAdaptor):
def error(self, msg: str, status: int = 400) -> HTTPException:
return HTTPException(status, detail=msg)
return HTTPException(status, detail=msg,
headers={'content-type': self.content_type})
def create_response(self, status: int, output: str, content_type: str) -> Response:
return Response(output, status_code=status, media_type=content_type)
def create_response(self, status: int, output: str) -> Response:
return Response(output, status_code=status, media_type=self.content_type)
def config(self) -> Configuration:
@ -59,7 +60,8 @@ def _wrap_endpoint(func: api_impl.EndpointFunc)\
def get_application(project_dir: Path,
environ: Optional[Mapping[str, str]] = None) -> Starlette:
environ: Optional[Mapping[str, str]] = None,
debug: bool = True) -> Starlette:
""" Create a Nominatim falcon ASGI application.
"""
config = Configuration(project_dir, environ)
@ -76,8 +78,14 @@ def get_application(project_dir: Path,
if config.get_bool('CORS_NOACCESSCONTROL'):
middleware.append(Middleware(CORSMiddleware, allow_origins=['*']))
app = Starlette(debug=True, routes=routes, middleware=middleware)
app = Starlette(debug=debug, routes=routes, middleware=middleware)
app.state.API = NominatimAPIAsync(project_dir, environ)
return app
def run_wsgi() -> Starlette:
""" Entry point for uvicorn.
"""
return get_application(Path('.'), debug=False)

View File

@ -53,7 +53,7 @@ else:
# SQLAlchemy introduced generic types in version 2.0 making typing
# inclompatiple with older versions. Add wrappers here so we don't have
# incompatible with older versions. Add wrappers here so we don't have
# to litter the code with bare-string types.
if TYPE_CHECKING:
@ -64,5 +64,7 @@ else:
SaSelect: TypeAlias = 'sa.Select[Any]'
SaRow: TypeAlias = 'sa.Row[Any]'
SaColumn: TypeAlias = 'sa.Column[Any]'
SaColumn: TypeAlias = 'sa.ColumnElement[Any]'
SaLabel: TypeAlias = 'sa.Label[Any]'
SaFromClause: TypeAlias = 'sa.FromClause'
SaSelectable: TypeAlias = 'sa.Selectable'

View File

@ -101,6 +101,11 @@ class JsonWriter:
return self.raw(json.dumps(value, ensure_ascii=False))
def float(self, value: float, precision: int) -> 'JsonWriter':
""" Write out a float value with the given precision.
"""
return self.raw(f"{value:0.{precision}f}")
def next(self) -> 'JsonWriter':
""" Write out a delimiter comma between JSON object or array elements.
"""

View File

@ -0,0 +1,88 @@
@APIDB
Feature: Layer parameter in reverse geocoding
Testing correct function of layer selection while reverse geocoding
@v1-api-python-only
Scenario: POIs are selected by default
When sending v1/reverse at 47.14077,9.52414
Then results contain
| category | type |
| tourism | viewpoint |
@v1-api-python-only
Scenario Outline: Same address level POI with different layers
When sending v1/reverse at 47.14077,9.52414
| layer |
| <layer> |
Then results contain
| category |
| <category> |
Examples:
| layer | category |
| address | highway |
| poi,address | tourism |
| address,poi | tourism |
| natural | waterway |
| address,natural | highway |
| natural,poi | tourism |
@v1-api-python-only
Scenario Outline: POIs are not selected without housenumber for address layer
When sending v1/reverse at 47.13816,9.52168
| layer |
| <layer> |
Then results contain
| category | type |
| <category> | <type> |
Examples:
| layer | category | type |
| address,poi | highway | bus_stop |
| address | amenity | parking |
@v1-api-python-only
Scenario: Between natural and low-zoom address prefer natural
When sending v1/reverse at 47.13636,9.52094
| layer | zoom |
| natural,address | 15 |
Then results contain
| category |
| waterway |
@v1-api-python-only
Scenario Outline: Search for mountain peaks begins at level 12
When sending v1/reverse at 47.08221,9.56769
| layer | zoom |
| natural | <zoom> |
Then results contain
| category | type |
| <category> | <type> |
Examples:
| zoom | category | type |
| 12 | natural | peak |
| 13 | waterway | river |
@v1-api-python-only
Scenario Outline: Reverse serach with manmade layers
When sending v1/reverse at 32.46904,-86.44439
| layer |
| <layer> |
Then results contain
| category | type |
| <category> | <type> |
Examples:
| layer | category | type |
| manmade | leisure | park |
| address | highway | residential |
| poi | leisure | pitch |
| natural | waterway | stream |
| natural,manmade | leisure | park |

View File

@ -19,7 +19,7 @@ Feature: Geocodejson for Reverse API
| Point | [9.5036065, 47.0660892] |
And results contain in field __geocoding
| version | licence | attribution |
| 0.1.0 | ODbL | Data © OpenStreetMap contributors, ODbL 1.0. https://osm.org/copyright |
| 0.1.0 | ODbL | ^Data © OpenStreetMap contributors, ODbL 1.0. https?://osm.org/copyright$ |
Examples:
| has_address | attributes |

View File

@ -42,7 +42,7 @@ Feature: Geojson for Reverse API
| way | 1 | 30 | place | house | place |
And results contain
| boundingbox |
| [47.118495392, 47.118595392, 9.57049676, 9.57059676] |
| ^\[47.118495\d*, 47.118595\d*, 9.570496\d*, 9.570596\d*\] |
And results contain
| display_name |
| 1019, Grosssteg, Sücka, Triesenberg, Oberland, 9497, Liechtenstein |

View File

@ -17,12 +17,12 @@ Feature: Json output for Reverse API
| 1 | attributes |
| 0 | not attributes |
Scenario Outline: Siple OSM result
Scenario Outline: Simple OSM result
When sending v1/reverse at 47.066,9.504 with format <format>
Then result has attributes place_id
And results contain
| licence |
| Data © OpenStreetMap contributors, ODbL 1.0. https://osm.org/copyright |
| ^Data © OpenStreetMap contributors, ODbL 1.0. https?://osm.org/copyright$ |
And results contain
| osm_type | osm_id |
| node | 6522627624 |
@ -62,7 +62,7 @@ Feature: Json output for Reverse API
| way | 1 |
And results contain
| centroid | boundingbox |
| 9.57054676 47.118545392 | ['47.118495392', '47.118595392', '9.57049676', '9.57059676'] |
| 9.57054676 47.118545392 | ^\['47.118495\d*', '47.118595\d*', '9.570496\d*', '9.570596\d*'\] |
And results contain
| display_name |
| 1019, Grosssteg, Sücka, Triesenberg, Oberland, 9497, Liechtenstein |

View File

@ -32,7 +32,7 @@ Feature: XML output for Reverse API
| way | 396009653 | 30 | 30 |
And results contain
| centroid | boundingbox |
| -86.4808553258 32.4753580256 | ^32.475308025\d*,32.475408025\d*,-86.480905325\d*,-86.480805325\d* |
| -86.4808553 32.4753580 | ^32.4753080\d*,32.4754080\d*,-86.4809053\d*,-86.4808053\d* |
And results contain
| display_name |
| 707, Upper Kingston Road, Upper Kingston, Prattville, Autauga County, 36067, United States |
@ -45,7 +45,7 @@ Feature: XML output for Reverse API
| way | 1 | 30 | 30 |
And results contain
| centroid | boundingbox |
| 9.57054676 47.118545392 | 47.118495392,47.118595392,9.57049676,9.57059676 |
| 9.57054676 47.118545392 | ^47.118495\d*,47.118595\d*,9.570496\d*,9.570596\d* |
And results contain
| display_name |
| 1019, Grosssteg, Sücka, Triesenberg, Oberland, 9497, Liechtenstein |

View File

@ -47,15 +47,16 @@ class Field:
""" Generic comparator for fields, which looks at the type of the
value compared.
"""
def __init__(self, value):
def __init__(self, value, **extra_args):
self.value = value
self.extra_args = extra_args
def __eq__(self, other):
if isinstance(self.value, float):
return math.isclose(self.value, float(other))
return math.isclose(self.value, float(other), **self.extra_args)
if self.value.startswith('^'):
return re.fullmatch(self.value, other)
return re.fullmatch(self.value, str(other))
if isinstance(other, dict):
return other == eval('{' + self.value + '}')

View File

@ -134,8 +134,8 @@ class GenericResponse:
lon, lat = context.osm.grid_node(int(value))
else:
raise RuntimeError("Context needed when using grid coordinates")
self.check_row_field(i, 'lat', Field(float(lat)), base=subdict)
self.check_row_field(i, 'lon', Field(float(lon)), base=subdict)
self.check_row_field(i, 'lat', Field(float(lat), abs_tol=1e-07), base=subdict)
self.check_row_field(i, 'lon', Field(float(lon), abs_tol=1e-07), base=subdict)
else:
self.check_row_field(i, name, Field(value), base=subdict)

View File

@ -229,7 +229,8 @@ def validate_result_number(context, operator, number):
@then(u'a HTTP (?P<status>\d+) is returned')
def check_http_return_status(context, status):
assert context.response.errorcode == int(status), \
f"Return HTTP status is {context.response.errorcode}."
f"Return HTTP status is {context.response.errorcode}."\
f" Full response:\n{context.response.page}"
@then(u'the page contents equals "(?P<text>.+)"')
def check_page_content_equals(context, text):

View File

@ -42,6 +42,9 @@ class APITester:
if isinstance(name, str):
name = {'name': name}
centroid = kw.get('centroid', (23.0, 34.0))
geometry = kw.get('geometry', 'POINT(%f %f)' % centroid)
self.add_data('placex',
{'place_id': kw.get('place_id', 1000),
'osm_type': kw.get('osm_type', 'W'),
@ -61,10 +64,11 @@ class APITester:
'rank_search': kw.get('rank_search', 30),
'rank_address': kw.get('rank_address', 30),
'importance': kw.get('importance'),
'centroid': 'SRID=4326;POINT(%f %f)' % kw.get('centroid', (23.0, 34.0)),
'centroid': 'SRID=4326;POINT(%f %f)' % centroid,
'indexed_status': kw.get('indexed_status', 0),
'indexed_date': kw.get('indexed_date',
dt.datetime(2022, 12, 7, 14, 14, 46, 0)),
'geometry': 'SRID=4326;' + kw.get('geometry', 'POINT(23 34)')})
'geometry': 'SRID=4326;' + geometry})
def add_address_placex(self, object_id, **kw):
@ -118,6 +122,13 @@ class APITester:
'geometry': 'SRID=4326;' + kw.get('geometry', 'POINT(23 34)')})
def add_country(self, country_code, geometry):
self.add_data('country_grid',
{'country_code': country_code,
'area': 0.1,
'geometry': 'SRID=4326;' + geometry})
async def exec_async(self, sql, *args, **kwargs):
async with self.api._async_api.begin() as conn:
return await conn.execute(sql, *args, **kwargs)
@ -136,8 +147,9 @@ def apiobj(temp_db_with_extensions, temp_db_conn, monkeypatch):
testapi = APITester()
testapi.async_to_sync(testapi.create_tables())
SQLPreprocessor(temp_db_conn, testapi.api.config)\
.run_sql_file(temp_db_conn, 'functions/address_lookup.sql')
proc = SQLPreprocessor(temp_db_conn, testapi.api.config)
proc.run_sql_file(temp_db_conn, 'functions/address_lookup.sql')
proc.run_sql_file(temp_db_conn, 'functions/ranking.sql')
loglib.set_log_output('text')
yield testapi

View File

@ -378,6 +378,10 @@ def test_lookup_in_tiger(apiobj):
startnumber=1, endnumber=4, step=1,
postcode='34425',
geometry='LINESTRING(23 34, 23 35)')
apiobj.add_placex(place_id=12,
category=('highway', 'residential'),
osm_type='W', osm_id=6601223,
geometry='LINESTRING(23 34, 23 35)')
result = apiobj.api.lookup(napi.PlaceID(4924), napi.LookupDetails())
@ -390,7 +394,7 @@ def test_lookup_in_tiger(apiobj):
assert result.place_id == 4924
assert result.parent_place_id == 12
assert result.linked_place_id is None
assert result.osm_object is None
assert result.osm_object == ('W', 6601223)
assert result.admin_level == 15
assert result.names is None

View File

@ -0,0 +1,346 @@
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2023 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Tests for reverse API call.
These tests make sure that all Python code is correct and executable.
Functional tests can be found in the BDD test suite.
"""
import json
import pytest
import nominatim.api as napi
def test_reverse_rank_30(apiobj):
apiobj.add_placex(place_id=223, class_='place', type='house',
housenumber='1',
centroid=(1.3, 0.7),
geometry='POINT(1.3 0.7)')
result = apiobj.api.reverse((1.3, 0.7))
assert result is not None
assert result.place_id == 223
@pytest.mark.parametrize('country', ['de', 'us'])
def test_reverse_street(apiobj, country):
apiobj.add_placex(place_id=990, class_='highway', type='service',
rank_search=27, rank_address=27,
name = {'name': 'My Street'},
centroid=(10.0, 10.0),
country_code=country,
geometry='LINESTRING(9.995 10, 10.005 10)')
assert apiobj.api.reverse((9.995, 10)).place_id == 990
def test_reverse_ignore_unindexed(apiobj):
apiobj.add_placex(place_id=223, class_='place', type='house',
housenumber='1',
indexed_status=2,
centroid=(1.3, 0.7),
geometry='POINT(1.3 0.7)')
result = apiobj.api.reverse((1.3, 0.7))
assert result is None
@pytest.mark.parametrize('y,layer,place_id', [(0.7, napi.DataLayer.ADDRESS, 223),
(0.70001, napi.DataLayer.POI, 224),
(0.7, napi.DataLayer.ADDRESS | napi.DataLayer.POI, 224),
(0.70001, napi.DataLayer.ADDRESS | napi.DataLayer.POI, 223),
(0.7, napi.DataLayer.MANMADE, 225),
(0.7, napi.DataLayer.RAILWAY, 226),
(0.7, napi.DataLayer.NATURAL, 227),
(0.70003, napi.DataLayer.MANMADE | napi.DataLayer.RAILWAY, 225),
(0.70003, napi.DataLayer.MANMADE | napi.DataLayer.NATURAL, 225)])
def test_reverse_rank_30_layers(apiobj, y, layer, place_id):
apiobj.add_placex(place_id=223, class_='place', type='house',
housenumber='1',
rank_address=30,
rank_search=30,
centroid=(1.3, 0.70001))
apiobj.add_placex(place_id=224, class_='amenity', type='toilet',
rank_address=30,
rank_search=30,
centroid=(1.3, 0.7))
apiobj.add_placex(place_id=225, class_='man_made', type='tower',
rank_address=0,
rank_search=30,
centroid=(1.3, 0.70003))
apiobj.add_placex(place_id=226, class_='railway', type='station',
rank_address=0,
rank_search=30,
centroid=(1.3, 0.70004))
apiobj.add_placex(place_id=227, class_='natural', type='cave',
rank_address=0,
rank_search=30,
centroid=(1.3, 0.70005))
assert apiobj.api.reverse((1.3, y), layer=layer).place_id == place_id
def test_reverse_poi_layer_with_no_pois(apiobj):
apiobj.add_placex(place_id=223, class_='place', type='house',
housenumber='1',
rank_address=30,
rank_search=30,
centroid=(1.3, 0.70001))
assert apiobj.api.reverse((1.3, 0.70001), max_rank=29,
layer=napi.DataLayer.POI) is None
def test_reverse_housenumber_on_street(apiobj):
apiobj.add_placex(place_id=990, class_='highway', type='service',
rank_search=27, rank_address=27,
name = {'name': 'My Street'},
centroid=(10.0, 10.0),
geometry='LINESTRING(9.995 10, 10.005 10)')
apiobj.add_placex(place_id=991, class_='place', type='house',
parent_place_id=990,
rank_search=30, rank_address=30,
housenumber='23',
centroid=(10.0, 10.00001))
assert apiobj.api.reverse((10.0, 10.0), max_rank=30).place_id == 991
assert apiobj.api.reverse((10.0, 10.0), max_rank=27).place_id == 990
assert apiobj.api.reverse((10.0, 10.00001), max_rank=30).place_id == 991
def test_reverse_housenumber_interpolation(apiobj):
apiobj.add_placex(place_id=990, class_='highway', type='service',
rank_search=27, rank_address=27,
name = {'name': 'My Street'},
centroid=(10.0, 10.0),
geometry='LINESTRING(9.995 10, 10.005 10)')
apiobj.add_placex(place_id=991, class_='place', type='house',
parent_place_id=990,
rank_search=30, rank_address=30,
housenumber='23',
centroid=(10.0, 10.00002))
apiobj.add_osmline(place_id=992,
parent_place_id=990,
startnumber=1, endnumber=3, step=1,
centroid=(10.0, 10.00001),
geometry='LINESTRING(9.995 10.00001, 10.005 10.00001)')
assert apiobj.api.reverse((10.0, 10.0)).place_id == 992
def test_reverse_housenumber_point_interpolation(apiobj):
apiobj.add_placex(place_id=990, class_='highway', type='service',
rank_search=27, rank_address=27,
name = {'name': 'My Street'},
centroid=(10.0, 10.0),
geometry='LINESTRING(9.995 10, 10.005 10)')
apiobj.add_osmline(place_id=992,
parent_place_id=990,
startnumber=42, endnumber=42, step=1,
centroid=(10.0, 10.00001),
geometry='POINT(10.0 10.00001)')
res = apiobj.api.reverse((10.0, 10.0))
assert res.place_id == 992
assert res.housenumber == '42'
def test_reverse_tiger_number(apiobj):
apiobj.add_placex(place_id=990, class_='highway', type='service',
rank_search=27, rank_address=27,
name = {'name': 'My Street'},
centroid=(10.0, 10.0),
country_code='us',
geometry='LINESTRING(9.995 10, 10.005 10)')
apiobj.add_tiger(place_id=992,
parent_place_id=990,
startnumber=1, endnumber=3, step=1,
centroid=(10.0, 10.00001),
geometry='LINESTRING(9.995 10.00001, 10.005 10.00001)')
assert apiobj.api.reverse((10.0, 10.0)).place_id == 992
assert apiobj.api.reverse((10.0, 10.00001)).place_id == 992
def test_reverse_point_tiger(apiobj):
apiobj.add_placex(place_id=990, class_='highway', type='service',
rank_search=27, rank_address=27,
name = {'name': 'My Street'},
centroid=(10.0, 10.0),
country_code='us',
geometry='LINESTRING(9.995 10, 10.005 10)')
apiobj.add_tiger(place_id=992,
parent_place_id=990,
startnumber=1, endnumber=1, step=1,
centroid=(10.0, 10.00001),
geometry='POINT(10.0 10.00001)')
res = apiobj.api.reverse((10.0, 10.0))
assert res.place_id == 992
assert res.housenumber == '1'
def test_reverse_low_zoom_address(apiobj):
apiobj.add_placex(place_id=1001, class_='place', type='house',
housenumber='1',
rank_address=30,
rank_search=30,
centroid=(59.3, 80.70001))
apiobj.add_placex(place_id=1002, class_='place', type='town',
name={'name': 'Town'},
rank_address=16,
rank_search=16,
centroid=(59.3, 80.70001),
geometry="""POLYGON((59.3 80.70001, 59.3001 80.70001,
59.3001 80.70101, 59.3 80.70101, 59.3 80.70001))""")
assert apiobj.api.reverse((59.30005, 80.7005)).place_id == 1001
assert apiobj.api.reverse((59.30005, 80.7005), max_rank=18).place_id == 1002
def test_reverse_place_node_in_area(apiobj):
apiobj.add_placex(place_id=1002, class_='place', type='town',
name={'name': 'Town Area'},
rank_address=16,
rank_search=16,
centroid=(59.3, 80.70001),
geometry="""POLYGON((59.3 80.70001, 59.3001 80.70001,
59.3001 80.70101, 59.3 80.70101, 59.3 80.70001))""")
apiobj.add_placex(place_id=1003, class_='place', type='suburb',
name={'name': 'Suburb Point'},
osm_type='N',
rank_address=18,
rank_search=18,
centroid=(59.30004, 80.70055))
assert apiobj.api.reverse((59.30004, 80.70055)).place_id == 1003
@pytest.mark.parametrize('layer,place_id', [(napi.DataLayer.MANMADE, 225),
(napi.DataLayer.RAILWAY, 226),
(napi.DataLayer.NATURAL, 227),
(napi.DataLayer.MANMADE | napi.DataLayer.RAILWAY, 225),
(napi.DataLayer.MANMADE | napi.DataLayer.NATURAL, 225)])
def test_reverse_larger_area_layers(apiobj, layer, place_id):
apiobj.add_placex(place_id=225, class_='man_made', type='dam',
name={'name': 'Dam'},
rank_address=0,
rank_search=25,
centroid=(1.3, 0.70003))
apiobj.add_placex(place_id=226, class_='railway', type='yard',
name={'name': 'Dam'},
rank_address=0,
rank_search=20,
centroid=(1.3, 0.70004))
apiobj.add_placex(place_id=227, class_='natural', type='spring',
name={'name': 'Dam'},
rank_address=0,
rank_search=16,
centroid=(1.3, 0.70005))
assert apiobj.api.reverse((1.3, 0.7), layer=layer).place_id == place_id
def test_reverse_country_lookup_no_objects(apiobj):
apiobj.add_country('xx', 'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))')
assert apiobj.api.reverse((0.5, 0.5)) is None
@pytest.mark.parametrize('rank', [4, 30])
def test_reverse_country_lookup_country_only(apiobj, rank):
apiobj.add_country('xx', 'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))')
apiobj.add_placex(place_id=225, class_='place', type='country',
name={'name': 'My Country'},
rank_address=4,
rank_search=4,
country_code='xx',
centroid=(0.7, 0.7))
assert apiobj.api.reverse((0.5, 0.5), max_rank=rank).place_id == 225
def test_reverse_country_lookup_place_node_inside(apiobj):
apiobj.add_country('xx', 'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))')
apiobj.add_placex(place_id=225, class_='place', type='state',
osm_type='N',
name={'name': 'My State'},
rank_address=6,
rank_search=6,
country_code='xx',
centroid=(0.5, 0.505))
assert apiobj.api.reverse((0.5, 0.5)).place_id == 225
@pytest.mark.parametrize('gtype', list(napi.GeometryFormat))
def test_reverse_geometry_output_placex(apiobj, gtype):
apiobj.add_country('xx', 'POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))')
apiobj.add_placex(place_id=1001, class_='place', type='house',
housenumber='1',
rank_address=30,
rank_search=30,
centroid=(59.3, 80.70001))
apiobj.add_placex(place_id=1003, class_='place', type='suburb',
name={'name': 'Suburb Point'},
osm_type='N',
rank_address=18,
rank_search=18,
country_code='xx',
centroid=(0.5, 0.5))
details = napi.LookupDetails(geometry_output=gtype)
assert apiobj.api.reverse((59.3, 80.70001), details=details).place_id == 1001
assert apiobj.api.reverse((0.5, 0.5), details=details).place_id == 1003
def test_reverse_simplified_geometry(apiobj):
apiobj.add_placex(place_id=1001, class_='place', type='house',
housenumber='1',
rank_address=30,
rank_search=30,
centroid=(59.3, 80.70001))
details = napi.LookupDetails(geometry_output=napi.GeometryFormat.GEOJSON,
geometry_simplification=0.1)
assert apiobj.api.reverse((59.3, 80.70001), details=details).place_id == 1001
def test_reverse_interpolation_geometry(apiobj):
apiobj.add_osmline(place_id=992,
parent_place_id=990,
startnumber=1, endnumber=3, step=1,
centroid=(10.0, 10.00001),
geometry='LINESTRING(9.995 10.00001, 10.005 10.00001)')
details = napi.LookupDetails(geometry_output=napi.GeometryFormat.TEXT)
assert apiobj.api.reverse((10.0, 10.0), details=details)\
.geometry['text'] == 'POINT(10 10.00001)'
def test_reverse_tiger_geometry(apiobj):
apiobj.add_placex(place_id=990, class_='highway', type='service',
rank_search=27, rank_address=27,
name = {'name': 'My Street'},
centroid=(10.0, 10.0),
country_code='us',
geometry='LINESTRING(9.995 10, 10.005 10)')
apiobj.add_tiger(place_id=992,
parent_place_id=990,
startnumber=1, endnumber=3, step=1,
centroid=(10.0, 10.00001),
geometry='LINESTRING(9.995 10.00001, 10.005 10.00001)')
details = napi.LookupDetails(geometry_output=napi.GeometryFormat.GEOJSON)
output = apiobj.api.reverse((10.0, 10.0), details=details).geometry['geojson']
assert json.loads(output) == {'coordinates': [10, 10.00001], 'type': 'Point'}

View File

@ -6,6 +6,9 @@
# For a full list of authors see the git log.
"""
Tests for formatting results for the V1 API.
These test only ensure that the Python code is correct.
For functional tests see BDD test suite.
"""
import datetime as dt
import json
@ -59,14 +62,14 @@ def test_status_format_json_full():
assert result == '{"status":0,"message":"OK","data_updated":"2010-02-07T20:20:03+00:00","software_version":"%s","database_version":"5.6"}' % (NOMINATIM_VERSION, )
# SearchResult
# DetailedResult
def test_search_details_minimal():
search = napi.SearchResult(napi.SourceTable.PLACEX,
('place', 'thing'),
napi.Point(1.0, 2.0))
search = napi.DetailedResult(napi.SourceTable.PLACEX,
('place', 'thing'),
napi.Point(1.0, 2.0))
result = api_impl.format_result(search, 'details-json', {})
result = api_impl.format_result(search, 'json', {})
assert json.loads(result) == \
{'category': 'place',
@ -83,8 +86,8 @@ def test_search_details_minimal():
def test_search_details_full():
import_date = dt.datetime(2010, 2, 7, 20, 20, 3, 0)
search = napi.SearchResult(
import_date = dt.datetime(2010, 2, 7, 20, 20, 3, 0, tzinfo=dt.timezone.utc)
search = napi.DetailedResult(
source_table=napi.SourceTable.PLACEX,
category=('amenity', 'bank'),
centroid=napi.Point(56.947, -87.44),
@ -106,7 +109,7 @@ def test_search_details_full():
indexed_date = import_date
)
result = api_impl.format_result(search, 'details-json', {})
result = api_impl.format_result(search, 'json', {})
assert json.loads(result) == \
{'place_id': 37563,
@ -140,12 +143,12 @@ def test_search_details_full():
('ST_Polygon', True),
('ST_MultiPolygon', True)])
def test_search_details_no_geometry(gtype, isarea):
search = napi.SearchResult(napi.SourceTable.PLACEX,
search = napi.DetailedResult(napi.SourceTable.PLACEX,
('place', 'thing'),
napi.Point(1.0, 2.0),
geometry={'type': gtype})
result = api_impl.format_result(search, 'details-json', {})
result = api_impl.format_result(search, 'json', {})
js = json.loads(result)
assert js['geometry'] == {'type': 'Point', 'coordinates': [1.0, 2.0]}
@ -153,23 +156,45 @@ def test_search_details_no_geometry(gtype, isarea):
def test_search_details_with_geometry():
search = napi.SearchResult(napi.SourceTable.PLACEX,
('place', 'thing'),
napi.Point(1.0, 2.0),
geometry={'geojson': '{"type":"Point","coordinates":[56.947,-87.44]}'})
search = napi.DetailedResult(napi.SourceTable.PLACEX,
('place', 'thing'),
napi.Point(1.0, 2.0),
geometry={'geojson': '{"type":"Point","coordinates":[56.947,-87.44]}'})
result = api_impl.format_result(search, 'details-json', {})
result = api_impl.format_result(search, 'json', {})
js = json.loads(result)
assert js['geometry'] == {'type': 'Point', 'coordinates': [56.947, -87.44]}
assert js['isarea'] == False
def test_search_details_with_icon_available():
search = napi.DetailedResult(napi.SourceTable.PLACEX,
('amenity', 'restaurant'),
napi.Point(1.0, 2.0))
result = api_impl.format_result(search, 'json', {'icon_base_url': 'foo'})
js = json.loads(result)
assert js['icon'] == 'foo/food_restaurant.p.20.png'
def test_search_details_with_icon_not_available():
search = napi.DetailedResult(napi.SourceTable.PLACEX,
('amenity', 'tree'),
napi.Point(1.0, 2.0))
result = api_impl.format_result(search, 'json', {'icon_base_url': 'foo'})
js = json.loads(result)
assert 'icon' not in js
def test_search_details_with_address_minimal():
search = napi.SearchResult(napi.SourceTable.PLACEX,
('place', 'thing'),
napi.Point(1.0, 2.0),
address_rows=[
search = napi.DetailedResult(napi.SourceTable.PLACEX,
('place', 'thing'),
napi.Point(1.0, 2.0),
address_rows=[
napi.AddressLine(place_id=None,
osm_object=None,
category=('bnd', 'note'),
@ -180,9 +205,9 @@ def test_search_details_with_address_minimal():
isaddress=False,
rank_address=10,
distance=0.0)
])
])
result = api_impl.format_result(search, 'details-json', {})
result = api_impl.format_result(search, 'json', {})
js = json.loads(result)
assert js['address'] == [{'localname': '',
@ -193,28 +218,32 @@ def test_search_details_with_address_minimal():
'isaddress': False}]
def test_search_details_with_address_full():
search = napi.SearchResult(napi.SourceTable.PLACEX,
('place', 'thing'),
napi.Point(1.0, 2.0),
address_rows=[
napi.AddressLine(place_id=3498,
osm_object=('R', 442),
category=('bnd', 'note'),
names={'name': 'Trespass'},
extratags={'access': 'no',
'place_type': 'spec'},
admin_level=4,
fromarea=True,
isaddress=True,
rank_address=10,
distance=0.034)
])
@pytest.mark.parametrize('field,outfield', [('address_rows', 'address'),
('linked_rows', 'linked_places'),
('parented_rows', 'hierarchy')
])
def test_search_details_with_further_infos(field, outfield):
search = napi.DetailedResult(napi.SourceTable.PLACEX,
('place', 'thing'),
napi.Point(1.0, 2.0))
result = api_impl.format_result(search, 'details-json', {})
setattr(search, field, [napi.AddressLine(place_id=3498,
osm_object=('R', 442),
category=('bnd', 'note'),
names={'name': 'Trespass'},
extratags={'access': 'no',
'place_type': 'spec'},
admin_level=4,
fromarea=True,
isaddress=True,
rank_address=10,
distance=0.034)
])
result = api_impl.format_result(search, 'json', {})
js = json.loads(result)
assert js['address'] == [{'localname': 'Trespass',
assert js[outfield] == [{'localname': 'Trespass',
'place_id': 3498,
'osm_id': 442,
'osm_type': 'R',
@ -225,3 +254,70 @@ def test_search_details_with_address_full():
'rank_address': 10,
'distance': 0.034,
'isaddress': True}]
def test_search_details_grouped_hierarchy():
search = napi.DetailedResult(napi.SourceTable.PLACEX,
('place', 'thing'),
napi.Point(1.0, 2.0),
parented_rows =
[napi.AddressLine(place_id=3498,
osm_object=('R', 442),
category=('bnd', 'note'),
names={'name': 'Trespass'},
extratags={'access': 'no',
'place_type': 'spec'},
admin_level=4,
fromarea=True,
isaddress=True,
rank_address=10,
distance=0.034)
])
result = api_impl.format_result(search, 'json', {'group_hierarchy': True})
js = json.loads(result)
assert js['hierarchy'] == {'note': [{'localname': 'Trespass',
'place_id': 3498,
'osm_id': 442,
'osm_type': 'R',
'place_type': 'spec',
'class': 'bnd',
'type': 'note',
'admin_level': 4,
'rank_address': 10,
'distance': 0.034,
'isaddress': True}]}
def test_search_details_keywords_name():
search = napi.DetailedResult(napi.SourceTable.PLACEX,
('place', 'thing'),
napi.Point(1.0, 2.0),
name_keywords=[
napi.WordInfo(23, 'foo', 'mefoo'),
napi.WordInfo(24, 'foo', 'bafoo')])
result = api_impl.format_result(search, 'json', {'keywords': True})
js = json.loads(result)
assert js['keywords'] == {'name': [{'id': 23, 'token': 'foo'},
{'id': 24, 'token': 'foo'}],
'address': []}
def test_search_details_keywords_address():
search = napi.DetailedResult(napi.SourceTable.PLACEX,
('place', 'thing'),
napi.Point(1.0, 2.0),
address_keywords=[
napi.WordInfo(23, 'foo', 'mefoo'),
napi.WordInfo(24, 'foo', 'bafoo')])
result = api_impl.format_result(search, 'json', {'keywords': True})
js = json.loads(result)
assert js['keywords'] == {'address': [{'id': 23, 'token': 'foo'},
{'id': 24, 'token': 'foo'}],
'name': []}

View File

@ -0,0 +1,320 @@
# SPDX-License-Identifier: GPL-2.0-only
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2023 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Tests for formatting reverse results for the V1 API.
These test only ensure that the Python code is correct.
For functional tests see BDD test suite.
"""
import json
import xml.etree.ElementTree as ET
import pytest
import nominatim.api.v1 as api_impl
import nominatim.api as napi
FORMATS = ['json', 'jsonv2', 'geojson', 'geocodejson', 'xml']
@pytest.mark.parametrize('fmt', FORMATS)
def test_format_reverse_minimal(fmt):
reverse = napi.ReverseResult(napi.SourceTable.PLACEX,
('amenity', 'post_box'),
napi.Point(0.3, -8.9))
raw = api_impl.format_result(napi.ReverseResults([reverse]), fmt, {})
if fmt == 'xml':
root = ET.fromstring(raw)
assert root.tag == 'reversegeocode'
else:
result = json.loads(raw)
assert isinstance(result, dict)
@pytest.mark.parametrize('fmt', FORMATS)
def test_format_reverse_no_result(fmt):
raw = api_impl.format_result(napi.ReverseResults(), fmt, {})
if fmt == 'xml':
root = ET.fromstring(raw)
assert root.find('error').text == 'Unable to geocode'
else:
assert json.loads(raw) == {'error': 'Unable to geocode'}
@pytest.mark.parametrize('fmt', FORMATS)
def test_format_reverse_with_osm_id(fmt):
reverse = napi.ReverseResult(napi.SourceTable.PLACEX,
('amenity', 'post_box'),
napi.Point(0.3, -8.9),
place_id=5564,
osm_object=('N', 23))
raw = api_impl.format_result(napi.ReverseResults([reverse]), fmt, {})
if fmt == 'xml':
root = ET.fromstring(raw).find('result')
assert root.attrib['osm_type'] == 'node'
assert root.attrib['osm_id'] == '23'
else:
result = json.loads(raw)
if fmt == 'geocodejson':
props = result['features'][0]['properties']['geocoding']
elif fmt == 'geojson':
props = result['features'][0]['properties']
else:
props = result
assert props['osm_type'] == 'node'
assert props['osm_id'] == 23
@pytest.mark.parametrize('fmt', FORMATS)
def test_format_reverse_with_address(fmt):
reverse = napi.ReverseResult(napi.SourceTable.PLACEX,
('place', 'thing'),
napi.Point(1.0, 2.0),
country_code='fe',
address_rows=napi.AddressLines([
napi.AddressLine(place_id=None,
osm_object=None,
category=('place', 'county'),
names={'name': 'Hello'},
extratags=None,
admin_level=5,
fromarea=False,
isaddress=True,
rank_address=10,
distance=0.0),
napi.AddressLine(place_id=None,
osm_object=None,
category=('place', 'county'),
names={'name': 'ByeBye'},
extratags=None,
admin_level=5,
fromarea=False,
isaddress=False,
rank_address=10,
distance=0.0)
]))
raw = api_impl.format_result(napi.ReverseResults([reverse]), fmt,
{'addressdetails': True})
if fmt == 'xml':
root = ET.fromstring(raw)
assert root.find('addressparts').find('county').text == 'Hello'
else:
result = json.loads(raw)
assert isinstance(result, dict)
if fmt == 'geocodejson':
props = result['features'][0]['properties']['geocoding']
assert 'admin' in props
assert props['county'] == 'Hello'
else:
if fmt == 'geojson':
props = result['features'][0]['properties']
else:
props = result
assert 'address' in props
def test_format_reverse_geocodejson_special_parts():
reverse = napi.ReverseResult(napi.SourceTable.PLACEX,
('place', 'house'),
napi.Point(1.0, 2.0),
place_id=33,
country_code='fe',
address_rows=napi.AddressLines([
napi.AddressLine(place_id=None,
osm_object=None,
category=('place', 'house_number'),
names={'ref': '1'},
extratags=None,
admin_level=15,
fromarea=False,
isaddress=True,
rank_address=10,
distance=0.0),
napi.AddressLine(place_id=None,
osm_object=None,
category=('place', 'postcode'),
names={'ref': '99446'},
extratags=None,
admin_level=11,
fromarea=False,
isaddress=True,
rank_address=10,
distance=0.0),
napi.AddressLine(place_id=33,
osm_object=None,
category=('place', 'county'),
names={'name': 'Hello'},
extratags=None,
admin_level=5,
fromarea=False,
isaddress=True,
rank_address=10,
distance=0.0)
]))
raw = api_impl.format_result(napi.ReverseResults([reverse]), 'geocodejson',
{'addressdetails': True})
props = json.loads(raw)['features'][0]['properties']['geocoding']
assert props['housenumber'] == '1'
assert props['postcode'] == '99446'
assert 'county' not in props
@pytest.mark.parametrize('fmt', FORMATS)
def test_format_reverse_with_address_none(fmt):
reverse = napi.ReverseResult(napi.SourceTable.PLACEX,
('place', 'thing'),
napi.Point(1.0, 2.0),
address_rows=napi.AddressLines())
raw = api_impl.format_result(napi.ReverseResults([reverse]), fmt,
{'addressdetails': True})
if fmt == 'xml':
root = ET.fromstring(raw)
assert root.find('addressparts') is None
else:
result = json.loads(raw)
assert isinstance(result, dict)
if fmt == 'geocodejson':
props = result['features'][0]['properties']['geocoding']
print(props)
assert 'admin' in props
else:
if fmt == 'geojson':
props = result['features'][0]['properties']
else:
props = result
assert 'address' in props
@pytest.mark.parametrize('fmt', ['json', 'jsonv2', 'geojson', 'xml'])
def test_format_reverse_with_extratags(fmt):
reverse = napi.ReverseResult(napi.SourceTable.PLACEX,
('place', 'thing'),
napi.Point(1.0, 2.0),
extratags={'one': 'A', 'two':'B'})
raw = api_impl.format_result(napi.ReverseResults([reverse]), fmt,
{'extratags': True})
if fmt == 'xml':
root = ET.fromstring(raw)
assert root.find('extratags').find('tag').attrib['key'] == 'one'
else:
result = json.loads(raw)
if fmt == 'geojson':
extra = result['features'][0]['properties']['extratags']
else:
extra = result['extratags']
assert extra == {'one': 'A', 'two':'B'}
@pytest.mark.parametrize('fmt', ['json', 'jsonv2', 'geojson', 'xml'])
def test_format_reverse_with_extratags_none(fmt):
reverse = napi.ReverseResult(napi.SourceTable.PLACEX,
('place', 'thing'),
napi.Point(1.0, 2.0))
raw = api_impl.format_result(napi.ReverseResults([reverse]), fmt,
{'extratags': True})
if fmt == 'xml':
root = ET.fromstring(raw)
assert root.find('extratags') is not None
else:
result = json.loads(raw)
if fmt == 'geojson':
extra = result['features'][0]['properties']['extratags']
else:
extra = result['extratags']
assert extra is None
@pytest.mark.parametrize('fmt', ['json', 'jsonv2', 'geojson', 'xml'])
def test_format_reverse_with_namedetails_with_name(fmt):
reverse = napi.ReverseResult(napi.SourceTable.PLACEX,
('place', 'thing'),
napi.Point(1.0, 2.0),
names={'name': 'A', 'ref':'1'})
raw = api_impl.format_result(napi.ReverseResults([reverse]), fmt,
{'namedetails': True})
if fmt == 'xml':
root = ET.fromstring(raw)
assert root.find('namedetails').find('name').text == 'A'
else:
result = json.loads(raw)
if fmt == 'geojson':
extra = result['features'][0]['properties']['namedetails']
else:
extra = result['namedetails']
assert extra == {'name': 'A', 'ref':'1'}
@pytest.mark.parametrize('fmt', ['json', 'jsonv2', 'geojson', 'xml'])
def test_format_reverse_with_namedetails_without_name(fmt):
reverse = napi.ReverseResult(napi.SourceTable.PLACEX,
('place', 'thing'),
napi.Point(1.0, 2.0))
raw = api_impl.format_result(napi.ReverseResults([reverse]), fmt,
{'namedetails': True})
if fmt == 'xml':
root = ET.fromstring(raw)
assert root.find('namedetails') is not None
else:
result = json.loads(raw)
if fmt == 'geojson':
extra = result['features'][0]['properties']['namedetails']
else:
extra = result['namedetails']
assert extra is None
@pytest.mark.parametrize('fmt', ['json', 'jsonv2'])
def test_search_details_with_icon_available(fmt):
reverse = napi.ReverseResult(napi.SourceTable.PLACEX,
('amenity', 'restaurant'),
napi.Point(1.0, 2.0))
result = api_impl.format_result(napi.ReverseResults([reverse]), fmt,
{'icon_base_url': 'foo'})
js = json.loads(result)
assert js['icon'] == 'foo/food_restaurant.p.20.png'
@pytest.mark.parametrize('fmt', ['json', 'jsonv2'])
def test_search_details_with_icon_not_available(fmt):
reverse = napi.ReverseResult(napi.SourceTable.PLACEX,
('amenity', 'tree'),
napi.Point(1.0, 2.0))
result = api_impl.format_result(napi.ReverseResults([reverse]), fmt,
{'icon_base_url': 'foo'})
assert 'icon' not in json.loads(result)

View File

@ -0,0 +1,84 @@
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2023 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Tests for result datatype helper functions.
"""
import struct
import pytest
import pytest_asyncio
import sqlalchemy as sa
from nominatim.api import SourceTable, DetailedResult, Point
import nominatim.api.results as nresults
class FakeCentroid:
def __init__(self, x, y):
self.data = struct.pack("=biidd", 1, 0x20000001, 4326,
x, y)
class FakeRow:
def __init__(self, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
self._mapping = kwargs
def test_minimal_detailed_result():
res = DetailedResult(SourceTable.PLACEX,
('amenity', 'post_box'),
Point(23.1, 0.5))
assert res.lon == 23.1
assert res.lat == 0.5
assert res.calculated_importance() == pytest.approx(0.0000001)
def test_detailed_result_custom_importance():
res = DetailedResult(SourceTable.PLACEX,
('amenity', 'post_box'),
Point(23.1, 0.5),
importance=0.4563)
assert res.calculated_importance() == 0.4563
@pytest.mark.parametrize('func', (nresults.create_from_placex_row,
nresults.create_from_osmline_row,
nresults.create_from_tiger_row,
nresults.create_from_postcode_row))
def test_create_row_none(func):
assert func(None, DetailedResult) is None
@pytest.mark.parametrize('func', (nresults.create_from_osmline_row,
nresults.create_from_tiger_row))
def test_create_row_with_housenumber(func):
row = FakeRow(place_id=2345, osm_type='W', osm_id=111, housenumber=4,
address=None, postcode='99900', country_code='xd',
centroid=FakeCentroid(0, 0))
res = func(row, DetailedResult)
assert res.housenumber == '4'
assert res.extratags is None
assert res.category == ('place', 'house')
@pytest.mark.parametrize('func', (nresults.create_from_osmline_row,
nresults.create_from_tiger_row))
def test_create_row_without_housenumber(func):
row = FakeRow(place_id=2345, osm_type='W', osm_id=111,
startnumber=1, endnumber=11, step=2,
address=None, postcode='99900', country_code='xd',
centroid=FakeCentroid(0, 0))
res = func(row, DetailedResult)
assert res.housenumber is None
assert res.extratags == {'startnumber': '1', 'endnumber': '11', 'step': '2'}
assert res.category == ('place', 'houses')

View File

@ -0,0 +1,386 @@
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2023 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Tests for the Python web frameworks adaptor, v1 API.
"""
from collections import namedtuple
import json
import xml.etree.ElementTree as ET
from pathlib import Path
import pytest
from nominatim.config import Configuration
import nominatim.api.v1.server_glue as glue
import nominatim.api as napi
import nominatim.api.logging as loglib
class FakeError(BaseException):
def __init__(self, msg, status):
self.msg = msg
self.status = status
def __str__(self):
return f'{self.status} -- {self.msg}'
FakeResponse = namedtuple('FakeResponse', ['status', 'output', 'content_type'])
class FakeAdaptor(glue.ASGIAdaptor):
def __init__(self, params={}, headers={}, config=None):
self.params = params
self.headers = headers
self._config = config or Configuration(None)
def get(self, name, default=None):
return self.params.get(name, default)
def get_header(self, name, default=None):
return self.headers.get(name, default)
def error(self, msg, status=400):
return FakeError(msg, status)
def create_response(self, status, output):
return FakeResponse(status, output, self.content_type)
def config(self):
return self._config
# ASGIAdaptor.get_int/bool()
@pytest.mark.parametrize('func', ['get_int', 'get_bool'])
def test_adaptor_get_int_missing_but_required(func):
with pytest.raises(FakeError, match='^400 -- .*missing'):
getattr(FakeAdaptor(), func)('something')
@pytest.mark.parametrize('func, val', [('get_int', 23), ('get_bool', True)])
def test_adaptor_get_int_missing_with_default(func, val):
assert getattr(FakeAdaptor(), func)('something', val) == val
@pytest.mark.parametrize('inp', ['0', '234', '-4566953498567934876'])
def test_adaptor_get_int_success(inp):
assert FakeAdaptor(params={'foo': inp}).get_int('foo') == int(inp)
assert FakeAdaptor(params={'foo': inp}).get_int('foo', 4) == int(inp)
@pytest.mark.parametrize('inp', ['rs', '4.5', '6f'])
def test_adaptor_get_int_bad_number(inp):
with pytest.raises(FakeError, match='^400 -- .*must be a number'):
FakeAdaptor(params={'foo': inp}).get_int('foo')
@pytest.mark.parametrize('inp', ['1', 'true', 'whatever', 'false'])
def test_adaptor_get_bool_trueish(inp):
assert FakeAdaptor(params={'foo': inp}).get_bool('foo')
def test_adaptor_get_bool_falsish():
assert not FakeAdaptor(params={'foo': '0'}).get_bool('foo')
# ASGIAdaptor.parse_format()
def test_adaptor_parse_format_use_default():
adaptor = FakeAdaptor()
assert adaptor.parse_format(napi.StatusResult, 'text') == 'text'
assert adaptor.content_type == 'text/plain; charset=utf-8'
def test_adaptor_parse_format_use_configured():
adaptor = FakeAdaptor(params={'format': 'json'})
assert adaptor.parse_format(napi.StatusResult, 'text') == 'json'
assert adaptor.content_type == 'application/json'
def test_adaptor_parse_format_invalid_value():
adaptor = FakeAdaptor(params={'format': '@!#'})
with pytest.raises(FakeError, match='^400 -- .*must be one of'):
adaptor.parse_format(napi.StatusResult, 'text')
# ASGIAdaptor.get_accepted_languages()
def test_accepted_languages_from_param():
a = FakeAdaptor(params={'accept-language': 'de'})
assert a.get_accepted_languages() == 'de'
def test_accepted_languages_from_header():
a = FakeAdaptor(headers={'http_accept_language': 'de'})
assert a.get_accepted_languages() == 'de'
def test_accepted_languages_from_default(monkeypatch):
monkeypatch.setenv('NOMINATIM_DEFAULT_LANGUAGE', 'de')
a = FakeAdaptor()
assert a.get_accepted_languages() == 'de'
def test_accepted_languages_param_over_header():
a = FakeAdaptor(params={'accept-language': 'de'},
headers={'http_accept_language': 'en'})
assert a.get_accepted_languages() == 'de'
def test_accepted_languages_header_over_default(monkeypatch):
monkeypatch.setenv('NOMINATIM_DEFAULT_LANGUAGE', 'en')
a = FakeAdaptor(headers={'http_accept_language': 'de'})
assert a.get_accepted_languages() == 'de'
# ASGIAdaptor.raise_error()
class TestAdaptorRaiseError:
@pytest.fixture(autouse=True)
def init_adaptor(self):
self.adaptor = FakeAdaptor()
self.adaptor.setup_debugging()
def run_raise_error(self, msg, status):
with pytest.raises(FakeError) as excinfo:
self.adaptor.raise_error(msg, status=status)
return excinfo.value
def test_without_content_set(self):
err = self.run_raise_error('TEST', 404)
assert self.adaptor.content_type == 'text/plain; charset=utf-8'
assert err.msg == 'TEST'
assert err.status == 404
def test_json(self):
self.adaptor.content_type = 'application/json'
err = self.run_raise_error('TEST', 501)
content = json.loads(err.msg)['error']
assert content['code'] == 501
assert content['message'] == 'TEST'
def test_xml(self):
self.adaptor.content_type = 'text/xml; charset=utf-8'
err = self.run_raise_error('this!', 503)
content = ET.fromstring(err.msg)
assert content.tag == 'error'
assert content.find('code').text == '503'
assert content.find('message').text == 'this!'
def test_raise_error_during_debug():
a = FakeAdaptor(params={'debug': '1'})
a.setup_debugging()
loglib.log().section('Ongoing')
with pytest.raises(FakeError) as excinfo:
a.raise_error('bad state')
content = ET.fromstring(excinfo.value.msg)
assert content.tag == 'html'
assert '>Ongoing<' in excinfo.value.msg
assert 'bad state' in excinfo.value.msg
# ASGIAdaptor.build_response
def test_build_response_without_content_type():
resp = FakeAdaptor().build_response('attention')
assert isinstance(resp, FakeResponse)
assert resp.status == 200
assert resp.output == 'attention'
assert resp.content_type == 'text/plain; charset=utf-8'
def test_build_response_with_status():
a = FakeAdaptor(params={'format': 'json'})
a.parse_format(napi.StatusResult, 'text')
resp = a.build_response('stuff\nmore stuff', status=404)
assert isinstance(resp, FakeResponse)
assert resp.status == 404
assert resp.output == 'stuff\nmore stuff'
assert resp.content_type == 'application/json'
def test_build_response_jsonp_with_json():
a = FakeAdaptor(params={'format': 'json', 'json_callback': 'test.func'})
a.parse_format(napi.StatusResult, 'text')
resp = a.build_response('{}')
assert isinstance(resp, FakeResponse)
assert resp.status == 200
assert resp.output == 'test.func({})'
assert resp.content_type == 'application/javascript'
def test_build_response_jsonp_without_json():
a = FakeAdaptor(params={'format': 'text', 'json_callback': 'test.func'})
a.parse_format(napi.StatusResult, 'text')
resp = a.build_response('{}')
assert isinstance(resp, FakeResponse)
assert resp.status == 200
assert resp.output == '{}'
assert resp.content_type == 'text/plain; charset=utf-8'
@pytest.mark.parametrize('param', ['alert(); func', '\\n', '', 'a b'])
def test_build_response_jsonp_bad_format(param):
a = FakeAdaptor(params={'format': 'json', 'json_callback': param})
a.parse_format(napi.StatusResult, 'text')
with pytest.raises(FakeError, match='^400 -- .*Invalid'):
a.build_response('{}')
# status_endpoint()
class TestStatusEndpoint:
@pytest.fixture(autouse=True)
def patch_status_func(self, monkeypatch):
async def _status(*args, **kwargs):
return self.status
monkeypatch.setattr(napi.NominatimAPIAsync, 'status', _status)
@pytest.mark.asyncio
async def test_status_without_params(self):
a = FakeAdaptor()
self.status = napi.StatusResult(0, 'foo')
resp = await glue.status_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
assert isinstance(resp, FakeResponse)
assert resp.status == 200
assert resp.content_type == 'text/plain; charset=utf-8'
@pytest.mark.asyncio
async def test_status_with_error(self):
a = FakeAdaptor()
self.status = napi.StatusResult(405, 'foo')
resp = await glue.status_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
assert isinstance(resp, FakeResponse)
assert resp.status == 500
assert resp.content_type == 'text/plain; charset=utf-8'
@pytest.mark.asyncio
async def test_status_json_with_error(self):
a = FakeAdaptor(params={'format': 'json'})
self.status = napi.StatusResult(405, 'foo')
resp = await glue.status_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
assert isinstance(resp, FakeResponse)
assert resp.status == 200
assert resp.content_type == 'application/json'
@pytest.mark.asyncio
async def test_status_bad_format(self):
a = FakeAdaptor(params={'format': 'foo'})
self.status = napi.StatusResult(0, 'foo')
with pytest.raises(FakeError):
await glue.status_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
# details_endpoint()
class TestDetailsEndpoint:
@pytest.fixture(autouse=True)
def patch_lookup_func(self, monkeypatch):
self.result = napi.DetailedResult(napi.SourceTable.PLACEX,
('place', 'thing'),
napi.Point(1.0, 2.0))
self.lookup_args = []
async def _lookup(*args, **kwargs):
self.lookup_args.extend(args[1:])
return self.result
monkeypatch.setattr(napi.NominatimAPIAsync, 'lookup', _lookup)
@pytest.mark.asyncio
async def test_details_no_params(self):
a = FakeAdaptor()
with pytest.raises(FakeError, match='^400 -- .*Missing'):
await glue.details_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
@pytest.mark.asyncio
async def test_details_by_place_id(self):
a = FakeAdaptor(params={'place_id': '4573'})
await glue.details_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
assert self.lookup_args[0].place_id == 4573
@pytest.mark.asyncio
async def test_details_by_osm_id(self):
a = FakeAdaptor(params={'osmtype': 'N', 'osmid': '45'})
await glue.details_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
assert self.lookup_args[0].osm_type == 'N'
assert self.lookup_args[0].osm_id == 45
assert self.lookup_args[0].osm_class is None
@pytest.mark.asyncio
async def test_details_with_debugging(self):
a = FakeAdaptor(params={'osmtype': 'N', 'osmid': '45', 'debug': '1'})
resp = await glue.details_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)
content = ET.fromstring(resp.output)
assert resp.content_type == 'text/html; charset=utf-8'
assert content.tag == 'html'
@pytest.mark.asyncio
async def test_details_no_result(self):
a = FakeAdaptor(params={'place_id': '4573'})
self.result = None
with pytest.raises(FakeError, match='^404 -- .*found'):
await glue.details_endpoint(napi.NominatimAPIAsync(Path('/invalid')), a)

View File

@ -24,7 +24,6 @@ def test_no_api_without_phpcgi(endpoint):
@pytest.mark.parametrize("params", [('search', '--query', 'new'),
('search', '--city', 'Berlin'),
('reverse', '--lat', '0', '--lon', '0', '--zoom', '13'),
('lookup', '--id', 'N1')])
class TestCliApiCallPhp:
@ -79,8 +78,8 @@ class TestCliDetailsCall:
@pytest.fixture(autouse=True)
def setup_status_mock(self, monkeypatch):
result = napi.SearchResult(napi.SourceTable.PLACEX, ('place', 'thing'),
napi.Point(1.0, -3.0))
result = napi.DetailedResult(napi.SourceTable.PLACEX, ('place', 'thing'),
napi.Point(1.0, -3.0))
monkeypatch.setattr(napi.NominatimAPI, 'lookup',
lambda *args: result)
@ -98,6 +97,65 @@ class TestCliDetailsCall:
json.loads(capsys.readouterr().out)
class TestCliReverseCall:
@pytest.fixture(autouse=True)
def setup_reverse_mock(self, monkeypatch):
result = napi.ReverseResult(napi.SourceTable.PLACEX, ('place', 'thing'),
napi.Point(1.0, -3.0),
names={'name':'Name', 'name:fr': 'Nom'},
extratags={'extra':'Extra'})
monkeypatch.setattr(napi.NominatimAPI, 'reverse',
lambda *args: result)
def test_reverse_simple(self, cli_call, tmp_path, capsys):
result = cli_call('reverse', '--project-dir', str(tmp_path),
'--lat', '34', '--lon', '34')
assert result == 0
out = json.loads(capsys.readouterr().out)
assert out['name'] == 'Name'
assert 'address' not in out
assert 'extratags' not in out
assert 'namedetails' not in out
@pytest.mark.parametrize('param,field', [('--addressdetails', 'address'),
('--extratags', 'extratags'),
('--namedetails', 'namedetails')])
def test_reverse_extra_stuff(self, cli_call, tmp_path, capsys, param, field):
result = cli_call('reverse', '--project-dir', str(tmp_path),
'--lat', '34', '--lon', '34', param)
assert result == 0
out = json.loads(capsys.readouterr().out)
assert field in out
def test_reverse_format(self, cli_call, tmp_path, capsys):
result = cli_call('reverse', '--project-dir', str(tmp_path),
'--lat', '34', '--lon', '34', '--format', 'geojson')
assert result == 0
out = json.loads(capsys.readouterr().out)
assert out['type'] == 'FeatureCollection'
def test_reverse_language(self, cli_call, tmp_path, capsys):
result = cli_call('reverse', '--project-dir', str(tmp_path),
'--lat', '34', '--lon', '34', '--lang', 'fr')
assert result == 0
out = json.loads(capsys.readouterr().out)
assert out['name'] == 'Nom'
QUERY_PARAMS = {
'search': ('--query', 'somewhere'),
'reverse': ('--lat', '20', '--lon', '30'),
@ -105,7 +163,7 @@ QUERY_PARAMS = {
'details': ('--node', '324')
}
@pytest.mark.parametrize("endpoint", (('search', 'reverse', 'lookup')))
@pytest.mark.parametrize("endpoint", (('search', 'lookup')))
class TestCliApiCommonParameters:
@pytest.fixture(autouse=True)