Nominatim/nominatim/api/results.py
2023-07-14 15:27:16 +02:00

575 lines
20 KiB
Python

# SPDX-License-Identifier: GPL-3.0-or-later
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2023 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Dataclasses for search results and helper functions to fill them.
Data classes are part of the public API while the functions are for
internal use only. That's why they are implemented as free-standing functions
instead of member functions.
"""
from typing import Optional, Tuple, Dict, Sequence, TypeVar, Type, List, Any, Union
import enum
import dataclasses
import datetime as dt
import sqlalchemy as sa
from nominatim.typing import SaSelect, SaRow, SaColumn
from nominatim.api.types import Point, Bbox, LookupDetails
from nominatim.api.connection import SearchConnection
from nominatim.api.logging import log
from nominatim.api.localization import Locales
# This file defines complex result data classes.
# pylint: disable=too-many-instance-attributes
def _mingle_name_tags(names: Optional[Dict[str, str]]) -> Optional[Dict[str, str]]:
""" Mix-in names from linked places, so that they show up
as standard names where necessary.
"""
if not names:
return None
out = {}
for k, v in names.items():
if k.startswith('_place_'):
outkey = k[7:]
out[k if outkey in names else outkey] = v
else:
out[k] = v
return out
class SourceTable(enum.Enum):
""" Enumeration of kinds of results.
"""
PLACEX = 1
OSMLINE = 2
TIGER = 3
POSTCODE = 4
COUNTRY = 5
@dataclasses.dataclass
class AddressLine:
""" Detailed information about a related place.
"""
place_id: Optional[int]
osm_object: Optional[Tuple[str, int]]
category: Tuple[str, str]
names: Dict[str, str]
extratags: Optional[Dict[str, str]]
admin_level: Optional[int]
fromarea: bool
isaddress: bool
rank_address: int
distance: float
local_name: Optional[str] = None
class AddressLines(List[AddressLine]):
""" Sequence of address lines order in descending order by their rank.
"""
def localize(self, locales: Locales) -> List[str]:
""" Set the local name of address parts according to the chosen
locale. Return the list of local names without duplications.
Only address parts that are marked as isaddress are localized
and returned.
"""
label_parts: List[str] = []
for line in self:
if line.isaddress and line.names:
line.local_name = locales.display_name(line.names)
if not label_parts or label_parts[-1] != line.local_name:
label_parts.append(line.local_name)
return label_parts
@dataclasses.dataclass
class WordInfo:
""" Detailed information about a search term.
"""
word_id: int
word_token: str
word: Optional[str] = None
WordInfos = Sequence[WordInfo]
@dataclasses.dataclass
class BaseResult:
""" Data class collecting information common to all
types of search results.
"""
source_table: SourceTable
category: Tuple[str, str]
centroid: Point
place_id : Optional[int] = None
osm_object: Optional[Tuple[str, int]] = None
locale_name: Optional[str] = None
display_name: Optional[str] = None
names: Optional[Dict[str, str]] = None
address: Optional[Dict[str, str]] = None
extratags: Optional[Dict[str, str]] = None
housenumber: Optional[str] = None
postcode: Optional[str] = None
wikipedia: Optional[str] = None
rank_address: int = 30
rank_search: int = 30
importance: Optional[float] = None
country_code: Optional[str] = None
address_rows: Optional[AddressLines] = None
linked_rows: Optional[AddressLines] = None
parented_rows: Optional[AddressLines] = None
name_keywords: Optional[WordInfos] = None
address_keywords: Optional[WordInfos] = None
geometry: Dict[str, str] = dataclasses.field(default_factory=dict)
@property
def lat(self) -> float:
""" Get the latitude (or y) of the center point of the place.
"""
return self.centroid[1]
@property
def lon(self) -> float:
""" Get the longitude (or x) of the center point of the place.
"""
return self.centroid[0]
def calculated_importance(self) -> float:
""" Get a valid importance value. This is either the stored importance
of the value or an artificial value computed from the place's
search rank.
"""
return self.importance or (0.7500001 - (self.rank_search/40.0))
def localize(self, locales: Locales) -> None:
""" Fill the locale_name and the display_name field for the
place and, if available, its address information.
"""
self.locale_name = locales.display_name(self.names)
if self.address_rows:
self.display_name = ', '.join(self.address_rows.localize(locales))
else:
self.display_name = self.locale_name
BaseResultT = TypeVar('BaseResultT', bound=BaseResult)
@dataclasses.dataclass
class DetailedResult(BaseResult):
""" A search result with more internal information from the database
added.
"""
parent_place_id: Optional[int] = None
linked_place_id: Optional[int] = None
admin_level: int = 15
indexed_date: Optional[dt.datetime] = None
@dataclasses.dataclass
class ReverseResult(BaseResult):
""" A search result for reverse geocoding.
"""
distance: Optional[float] = None
bbox: Optional[Bbox] = None
class ReverseResults(List[ReverseResult]):
""" Sequence of reverse lookup results ordered by distance.
May be empty when no result was found.
"""
@dataclasses.dataclass
class SearchResult(BaseResult):
""" A search result for forward geocoding.
"""
bbox: Optional[Bbox] = None
accuracy: float = 0.0
@property
def ranking(self) -> float:
""" Return the ranking, a combined measure of accuracy and importance.
"""
return (self.accuracy if self.accuracy is not None else 1) \
- self.calculated_importance()
class SearchResults(List[SearchResult]):
""" Sequence of forward lookup results ordered by relevance.
May be empty when no result was found.
"""
def localize(self, locales: Locales) -> None:
""" Apply the given locales to all results.
"""
for result in self:
result.localize(locales)
def _filter_geometries(row: SaRow) -> Dict[str, str]:
return {k[9:]: v for k, v in row._mapping.items() # pylint: disable=W0212
if k.startswith('geometry_')}
def create_from_placex_row(row: Optional[SaRow],
class_type: Type[BaseResultT]) -> Optional[BaseResultT]:
""" Construct a new result and add the data from the result row
from the placex table. 'class_type' defines the type of result
to return. Returns None if the row is None.
"""
if row is None:
return None
return class_type(source_table=SourceTable.PLACEX,
place_id=row.place_id,
osm_object=(row.osm_type, row.osm_id),
category=(row.class_, row.type),
names=_mingle_name_tags(row.name),
address=row.address,
extratags=row.extratags,
housenumber=row.housenumber,
postcode=row.postcode,
wikipedia=row.wikipedia,
rank_address=row.rank_address,
rank_search=row.rank_search,
importance=row.importance,
country_code=row.country_code,
centroid=Point.from_wkb(row.centroid),
geometry=_filter_geometries(row))
def create_from_osmline_row(row: Optional[SaRow],
class_type: Type[BaseResultT]) -> Optional[BaseResultT]:
""" Construct a new result and add the data from the result row
from the address interpolation table osmline. 'class_type' defines
the type of result to return. Returns None if the row is None.
If the row contains a housenumber, then the housenumber is filled out.
Otherwise the result contains the interpolation information in extratags.
"""
if row is None:
return None
hnr = getattr(row, 'housenumber', None)
res = class_type(source_table=SourceTable.OSMLINE,
place_id=row.place_id,
osm_object=('W', row.osm_id),
category=('place', 'houses' if hnr is None else 'house'),
address=row.address,
postcode=row.postcode,
country_code=row.country_code,
centroid=Point.from_wkb(row.centroid),
geometry=_filter_geometries(row))
if hnr is None:
res.extratags = {'startnumber': str(row.startnumber),
'endnumber': str(row.endnumber),
'step': str(row.step)}
else:
res.housenumber = str(hnr)
return res
def create_from_tiger_row(row: Optional[SaRow],
class_type: Type[BaseResultT],
osm_type: Optional[str] = None,
osm_id: Optional[int] = None) -> Optional[BaseResultT]:
""" Construct a new result and add the data from the result row
from the Tiger data interpolation table. 'class_type' defines
the type of result to return. Returns None if the row is None.
If the row contains a housenumber, then the housenumber is filled out.
Otherwise the result contains the interpolation information in extratags.
"""
if row is None:
return None
hnr = getattr(row, 'housenumber', None)
res = class_type(source_table=SourceTable.TIGER,
place_id=row.place_id,
osm_object=(osm_type or row.osm_type, osm_id or row.osm_id),
category=('place', 'houses' if hnr is None else 'house'),
postcode=row.postcode,
country_code='us',
centroid=Point.from_wkb(row.centroid),
geometry=_filter_geometries(row))
if hnr is None:
res.extratags = {'startnumber': str(row.startnumber),
'endnumber': str(row.endnumber),
'step': str(row.step)}
else:
res.housenumber = str(hnr)
return res
def create_from_postcode_row(row: Optional[SaRow],
class_type: Type[BaseResultT]) -> Optional[BaseResultT]:
""" Construct a new result and add the data from the result row
from the postcode table. 'class_type' defines
the type of result to return. Returns None if the row is None.
"""
if row is None:
return None
return class_type(source_table=SourceTable.POSTCODE,
place_id=row.place_id,
category=('place', 'postcode'),
names={'ref': row.postcode},
rank_search=row.rank_search,
rank_address=row.rank_address,
country_code=row.country_code,
centroid=Point.from_wkb(row.centroid),
geometry=_filter_geometries(row))
def create_from_country_row(row: Optional[SaRow],
class_type: Type[BaseResultT]) -> Optional[BaseResultT]:
""" Construct a new result and add the data from the result row
from the fallback country tables. 'class_type' defines
the type of result to return. Returns None if the row is None.
"""
if row is None:
return None
return class_type(source_table=SourceTable.COUNTRY,
category=('place', 'country'),
centroid=Point.from_wkb(row.centroid),
names=row.name,
rank_address=4, rank_search=4,
country_code=row.country_code)
async def add_result_details(conn: SearchConnection, results: List[BaseResultT],
details: LookupDetails) -> None:
""" Retrieve more details from the database according to the
parameters specified in 'details'.
"""
if results:
log().section('Query details for result')
if details.address_details:
log().comment('Query address details')
await complete_address_details(conn, results)
if details.linked_places:
log().comment('Query linked places')
for result in results:
await complete_linked_places(conn, result)
if details.parented_places:
log().comment('Query parent places')
for result in results:
await complete_parented_places(conn, result)
if details.keywords:
log().comment('Query keywords')
for result in results:
await complete_keywords(conn, result)
def _result_row_to_address_row(row: SaRow) -> AddressLine:
""" Create a new AddressLine from the results of a datbase query.
"""
extratags: Dict[str, str] = getattr(row, 'extratags', {})
if hasattr(row, 'place_type') and row.place_type:
extratags['place'] = row.place_type
names = _mingle_name_tags(row.name) or {}
if getattr(row, 'housenumber', None) is not None:
names['housenumber'] = row.housenumber
return AddressLine(place_id=row.place_id,
osm_object=None if row.osm_type is None else (row.osm_type, row.osm_id),
category=(getattr(row, 'class'), row.type),
names=names,
extratags=extratags,
admin_level=row.admin_level,
fromarea=row.fromarea,
isaddress=getattr(row, 'isaddress', True),
rank_address=row.rank_address,
distance=row.distance)
def _get_housenumber_details(results: List[BaseResultT]) -> Tuple[List[int], List[int]]:
places = []
hnrs = []
for result in results:
if result.place_id:
housenumber = -1
if result.source_table in (SourceTable.TIGER, SourceTable.OSMLINE):
if result.housenumber is not None:
housenumber = int(result.housenumber)
elif result.extratags is not None and 'startnumber' in result.extratags:
# details requests do not come with a specific house number
housenumber = int(result.extratags['startnumber'])
places.append(result.place_id)
hnrs.append(housenumber)
return places, hnrs
async def complete_address_details(conn: SearchConnection, results: List[BaseResultT]) -> None:
""" Retrieve information about places that make up the address of the result.
"""
places, hnrs = _get_housenumber_details(results)
if not places:
return
def _get_addressdata(place_id: Union[int, SaColumn], hnr: Union[int, SaColumn]) -> Any:
return sa.func.get_addressdata(place_id, hnr)\
.table_valued( # type: ignore[no-untyped-call]
sa.column('place_id', type_=sa.Integer),
'osm_type',
sa.column('osm_id', type_=sa.BigInteger),
sa.column('name', type_=conn.t.types.Composite),
'class', 'type', 'place_type',
sa.column('admin_level', type_=sa.Integer),
sa.column('fromarea', type_=sa.Boolean),
sa.column('isaddress', type_=sa.Boolean),
sa.column('rank_address', type_=sa.SmallInteger),
sa.column('distance', type_=sa.Float),
joins_implicitly=True)
if len(places) == 1:
# Optimized case for exactly one result (reverse)
sql = sa.select(_get_addressdata(places[0], hnrs[0]))\
.order_by(sa.column('rank_address').desc(),
sa.column('isaddress').desc())
alines = AddressLines()
for row in await conn.execute(sql):
alines.append(_result_row_to_address_row(row))
for result in results:
if result.place_id == places[0]:
result.address_rows = alines
return
darray = sa.func.unnest(conn.t.types.to_array(places), conn.t.types.to_array(hnrs))\
.table_valued( # type: ignore[no-untyped-call]
sa.column('place_id', type_= sa.Integer),
sa.column('housenumber', type_= sa.Integer)
).render_derived()
sfn = _get_addressdata(darray.c.place_id, darray.c.housenumber)
sql = sa.select(darray.c.place_id.label('result_place_id'), sfn)\
.order_by(darray.c.place_id,
sa.column('rank_address').desc(),
sa.column('isaddress').desc())
current_result = None
for row in await conn.execute(sql):
if current_result is None or row.result_place_id != current_result.place_id:
for result in results:
if result.place_id == row.result_place_id:
current_result = result
break
else:
assert False
current_result.address_rows = AddressLines()
current_result.address_rows.append(_result_row_to_address_row(row))
# pylint: disable=consider-using-f-string
def _placex_select_address_row(conn: SearchConnection,
centroid: Point) -> SaSelect:
t = conn.t.placex
return sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
t.c.class_.label('class'), t.c.type,
t.c.admin_level, t.c.housenumber,
sa.literal_column("""ST_GeometryType(geometry) in
('ST_Polygon','ST_MultiPolygon')""").label('fromarea'),
t.c.rank_address,
sa.literal_column(
"""ST_DistanceSpheroid(geometry, 'SRID=4326;POINT(%f %f)'::geometry,
'SPHEROID["WGS 84",6378137,298.257223563, AUTHORITY["EPSG","7030"]]')
""" % centroid).label('distance'))
async def complete_linked_places(conn: SearchConnection, result: BaseResult) -> None:
""" Retrieve information about places that link to the result.
"""
result.linked_rows = AddressLines()
if result.source_table != SourceTable.PLACEX:
return
sql = _placex_select_address_row(conn, result.centroid)\
.where(conn.t.placex.c.linked_place_id == result.place_id)
for row in await conn.execute(sql):
result.linked_rows.append(_result_row_to_address_row(row))
async def complete_keywords(conn: SearchConnection, result: BaseResult) -> None:
""" Retrieve information about the search terms used for this place.
Requires that the query analyzer was initialised to get access to
the word table.
"""
t = conn.t.search_name
sql = sa.select(t.c.name_vector, t.c.nameaddress_vector)\
.where(t.c.place_id == result.place_id)
result.name_keywords = []
result.address_keywords = []
t = conn.t.meta.tables['word']
sel = sa.select(t.c.word_id, t.c.word_token, t.c.word)
for name_tokens, address_tokens in await conn.execute(sql):
for row in await conn.execute(sel.where(t.c.word_id == sa.any_(name_tokens))):
result.name_keywords.append(WordInfo(*row))
for row in await conn.execute(sel.where(t.c.word_id == sa.any_(address_tokens))):
result.address_keywords.append(WordInfo(*row))
async def complete_parented_places(conn: SearchConnection, result: BaseResult) -> None:
""" Retrieve information about places that the result provides the
address for.
"""
result.parented_rows = AddressLines()
if result.source_table != SourceTable.PLACEX:
return
sql = _placex_select_address_row(conn, result.centroid)\
.where(conn.t.placex.c.parent_place_id == result.place_id)\
.where(conn.t.placex.c.rank_search == 30)
for row in await conn.execute(sql):
result.parented_rows.append(_result_row_to_address_row(row))