mirror of
https://github.com/osm-search/Nominatim.git
synced 2024-11-30 12:22:10 +03:00
252 lines
9.7 KiB
Python
252 lines
9.7 KiB
Python
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
#
|
|
# This file is part of Nominatim. (https://nominatim.org)
|
|
#
|
|
# Copyright (C) 2023 by the Nominatim developer community.
|
|
# For a full list of authors see the git log.
|
|
"""
|
|
Implementation of place lookup by ID.
|
|
"""
|
|
from typing import Optional, Callable, Tuple, Type
|
|
import datetime as dt
|
|
|
|
import sqlalchemy as sa
|
|
|
|
from nominatim.typing import SaColumn, SaRow, SaSelect
|
|
from nominatim.api.connection import SearchConnection
|
|
import nominatim.api.types as ntyp
|
|
import nominatim.api.results as nres
|
|
from nominatim.api.logging import log
|
|
|
|
RowFunc = Callable[[Optional[SaRow], Type[nres.BaseResultT]], Optional[nres.BaseResultT]]
|
|
|
|
GeomFunc = Callable[[SaSelect, SaColumn], SaSelect]
|
|
|
|
|
|
|
|
async def find_in_placex(conn: SearchConnection, place: ntyp.PlaceRef,
|
|
add_geometries: GeomFunc) -> Optional[SaRow]:
|
|
""" Search for the given place in the placex table and return the
|
|
base information.
|
|
"""
|
|
log().section("Find in placex table")
|
|
t = conn.t.placex
|
|
sql = sa.select(t.c.place_id, t.c.osm_type, t.c.osm_id, t.c.name,
|
|
t.c.class_, t.c.type, t.c.admin_level,
|
|
t.c.address, t.c.extratags,
|
|
t.c.housenumber, t.c.postcode, t.c.country_code,
|
|
t.c.importance, t.c.wikipedia, t.c.indexed_date,
|
|
t.c.parent_place_id, t.c.rank_address, t.c.rank_search,
|
|
t.c.linked_place_id,
|
|
t.c.geometry.ST_Expand(0).label('bbox'),
|
|
t.c.centroid)
|
|
|
|
if isinstance(place, ntyp.PlaceID):
|
|
sql = sql.where(t.c.place_id == place.place_id)
|
|
elif isinstance(place, ntyp.OsmID):
|
|
sql = sql.where(t.c.osm_type == place.osm_type)\
|
|
.where(t.c.osm_id == place.osm_id)
|
|
if place.osm_class:
|
|
sql = sql.where(t.c.class_ == place.osm_class)
|
|
else:
|
|
sql = sql.order_by(t.c.class_)
|
|
sql = sql.limit(1)
|
|
else:
|
|
return None
|
|
|
|
return (await conn.execute(add_geometries(sql, t.c.geometry))).one_or_none()
|
|
|
|
|
|
async def find_in_osmline(conn: SearchConnection, place: ntyp.PlaceRef,
|
|
add_geometries: GeomFunc) -> Optional[SaRow]:
|
|
""" Search for the given place in the osmline table and return the
|
|
base information.
|
|
"""
|
|
log().section("Find in interpolation table")
|
|
t = conn.t.osmline
|
|
sql = sa.select(t.c.place_id, t.c.osm_id, t.c.parent_place_id,
|
|
t.c.indexed_date, t.c.startnumber, t.c.endnumber,
|
|
t.c.step, t.c.address, t.c.postcode, t.c.country_code,
|
|
t.c.linegeo.ST_Centroid().label('centroid'))
|
|
|
|
if isinstance(place, ntyp.PlaceID):
|
|
sql = sql.where(t.c.place_id == place.place_id)
|
|
elif isinstance(place, ntyp.OsmID) and place.osm_type == 'W':
|
|
# There may be multiple interpolations for a single way.
|
|
# If 'class' contains a number, return the one that belongs to that number.
|
|
sql = sql.where(t.c.osm_id == place.osm_id).limit(1)
|
|
if place.osm_class and place.osm_class.isdigit():
|
|
sql = sql.order_by(sa.func.greatest(0,
|
|
int(place.osm_class) - t.c.endnumber,
|
|
t.c.startnumber - int(place.osm_class)))
|
|
else:
|
|
return None
|
|
|
|
return (await conn.execute(add_geometries(sql, t.c.linegeo))).one_or_none()
|
|
|
|
|
|
async def find_in_tiger(conn: SearchConnection, place: ntyp.PlaceRef,
|
|
add_geometries: GeomFunc) -> Optional[SaRow]:
|
|
""" Search for the given place in the table of Tiger addresses and return
|
|
the base information. Only lookup by place ID is supported.
|
|
"""
|
|
if not isinstance(place, ntyp.PlaceID):
|
|
return None
|
|
|
|
log().section("Find in TIGER table")
|
|
t = conn.t.tiger
|
|
parent = conn.t.placex
|
|
sql = sa.select(t.c.place_id, t.c.parent_place_id,
|
|
parent.c.osm_type, parent.c.osm_id,
|
|
t.c.startnumber, t.c.endnumber, t.c.step,
|
|
t.c.postcode,
|
|
t.c.linegeo.ST_Centroid().label('centroid'))\
|
|
.where(t.c.place_id == place.place_id)\
|
|
.join(parent, t.c.parent_place_id == parent.c.place_id, isouter=True)
|
|
|
|
return (await conn.execute(add_geometries(sql, t.c.linegeo))).one_or_none()
|
|
|
|
|
|
async def find_in_postcode(conn: SearchConnection, place: ntyp.PlaceRef,
|
|
add_geometries: GeomFunc) -> Optional[SaRow]:
|
|
""" Search for the given place in the postcode table and return the
|
|
base information. Only lookup by place ID is supported.
|
|
"""
|
|
if not isinstance(place, ntyp.PlaceID):
|
|
return None
|
|
|
|
log().section("Find in postcode table")
|
|
t = conn.t.postcode
|
|
sql = sa.select(t.c.place_id, t.c.parent_place_id,
|
|
t.c.rank_search, t.c.rank_address,
|
|
t.c.indexed_date, t.c.postcode, t.c.country_code,
|
|
t.c.geometry.label('centroid')) \
|
|
.where(t.c.place_id == place.place_id)
|
|
|
|
return (await conn.execute(add_geometries(sql, t.c.geometry))).one_or_none()
|
|
|
|
|
|
async def find_in_all_tables(conn: SearchConnection, place: ntyp.PlaceRef,
|
|
add_geometries: GeomFunc
|
|
) -> Tuple[Optional[SaRow], RowFunc[nres.BaseResultT]]:
|
|
""" Search for the given place in all data tables
|
|
and return the base information.
|
|
"""
|
|
row = await find_in_placex(conn, place, add_geometries)
|
|
log().var_dump('Result (placex)', row)
|
|
if row is not None:
|
|
return row, nres.create_from_placex_row
|
|
|
|
row = await find_in_osmline(conn, place, add_geometries)
|
|
log().var_dump('Result (osmline)', row)
|
|
if row is not None:
|
|
return row, nres.create_from_osmline_row
|
|
|
|
row = await find_in_postcode(conn, place, add_geometries)
|
|
log().var_dump('Result (postcode)', row)
|
|
if row is not None:
|
|
return row, nres.create_from_postcode_row
|
|
|
|
row = await find_in_tiger(conn, place, add_geometries)
|
|
log().var_dump('Result (tiger)', row)
|
|
return row, nres.create_from_tiger_row
|
|
|
|
|
|
async def get_detailed_place(conn: SearchConnection, place: ntyp.PlaceRef,
|
|
details: ntyp.LookupDetails) -> Optional[nres.DetailedResult]:
|
|
""" Retrieve a place with additional details from the database.
|
|
"""
|
|
log().function('get_detailed_place', place=place, details=details)
|
|
|
|
if details.geometry_output and details.geometry_output != ntyp.GeometryFormat.GEOJSON:
|
|
raise ValueError("lookup only supports geojosn polygon output.")
|
|
|
|
if details.geometry_output & ntyp.GeometryFormat.GEOJSON:
|
|
def _add_geometry(sql: SaSelect, column: SaColumn) -> SaSelect:
|
|
return sql.add_columns(sa.func.ST_AsGeoJSON(
|
|
sa.case((sa.func.ST_NPoints(column) > 5000,
|
|
sa.func.ST_SimplifyPreserveTopology(column, 0.0001)),
|
|
else_=column), 7).label('geometry_geojson'))
|
|
else:
|
|
def _add_geometry(sql: SaSelect, column: SaColumn) -> SaSelect:
|
|
return sql.add_columns(sa.func.ST_GeometryType(column).label('geometry_type'))
|
|
|
|
row_func: RowFunc[nres.DetailedResult]
|
|
row, row_func = await find_in_all_tables(conn, place, _add_geometry)
|
|
|
|
if row is None:
|
|
return None
|
|
|
|
result = row_func(row, nres.DetailedResult)
|
|
assert result is not None
|
|
|
|
# add missing details
|
|
assert result is not None
|
|
if 'type' in result.geometry:
|
|
result.geometry['type'] = GEOMETRY_TYPE_MAP.get(result.geometry['type'],
|
|
result.geometry['type'])
|
|
indexed_date = getattr(row, 'indexed_date', None)
|
|
if indexed_date is not None:
|
|
result.indexed_date = indexed_date.replace(tzinfo=dt.timezone.utc)
|
|
|
|
await nres.add_result_details(conn, [result], details)
|
|
|
|
return result
|
|
|
|
|
|
async def get_simple_place(conn: SearchConnection, place: ntyp.PlaceRef,
|
|
details: ntyp.LookupDetails) -> Optional[nres.SearchResult]:
|
|
""" Retrieve a place as a simple search result from the database.
|
|
"""
|
|
log().function('get_simple_place', place=place, details=details)
|
|
|
|
def _add_geometry(sql: SaSelect, col: SaColumn) -> SaSelect:
|
|
if not details.geometry_output:
|
|
return sql
|
|
|
|
out = []
|
|
|
|
if details.geometry_simplification > 0.0:
|
|
col = sa.func.ST_SimplifyPreserveTopology(col, details.geometry_simplification)
|
|
|
|
if details.geometry_output & ntyp.GeometryFormat.GEOJSON:
|
|
out.append(sa.func.ST_AsGeoJSON(col, 7).label('geometry_geojson'))
|
|
if details.geometry_output & ntyp.GeometryFormat.TEXT:
|
|
out.append(sa.func.ST_AsText(col).label('geometry_text'))
|
|
if details.geometry_output & ntyp.GeometryFormat.KML:
|
|
out.append(sa.func.ST_AsKML(col, 7).label('geometry_kml'))
|
|
if details.geometry_output & ntyp.GeometryFormat.SVG:
|
|
out.append(sa.func.ST_AsSVG(col, 0, 7).label('geometry_svg'))
|
|
|
|
return sql.add_columns(*out)
|
|
|
|
|
|
row_func: RowFunc[nres.SearchResult]
|
|
row, row_func = await find_in_all_tables(conn, place, _add_geometry)
|
|
|
|
if row is None:
|
|
return None
|
|
|
|
result = row_func(row, nres.SearchResult)
|
|
assert result is not None
|
|
|
|
# add missing details
|
|
assert result is not None
|
|
if hasattr(row, 'bbox'):
|
|
result.bbox = ntyp.Bbox.from_wkb(row.bbox)
|
|
|
|
await nres.add_result_details(conn, [result], details)
|
|
|
|
return result
|
|
|
|
|
|
GEOMETRY_TYPE_MAP = {
|
|
'POINT': 'ST_Point',
|
|
'MULTIPOINT': 'ST_MultiPoint',
|
|
'LINESTRING': 'ST_LineString',
|
|
'MULTILINESTRING': 'ST_MultiLineString',
|
|
'POLYGON': 'ST_Polygon',
|
|
'MULTIPOLYGON': 'ST_MultiPolygon',
|
|
'GEOMETRYCOLLECTION': 'ST_GeometryCollection'
|
|
}
|