From 2735ea768aa812998a9498cf411563f118bd6ad6 Mon Sep 17 00:00:00 2001 From: Sarah Hoffmann Date: Tue, 12 Nov 2024 22:15:43 +0100 Subject: [PATCH] look up all places at once --- src/nominatim_api/core.py | 5 +- src/nominatim_api/lookup.py | 464 +++++++++++++++++++++--------------- src/nominatim_api/types.py | 11 + 3 files changed, 287 insertions(+), 193 deletions(-) diff --git a/src/nominatim_api/core.py b/src/nominatim_api/core.py index 3f4652bf..c45b24de 100644 --- a/src/nominatim_api/core.py +++ b/src/nominatim_api/core.py @@ -24,7 +24,7 @@ from .config import Configuration from .sql import sqlite_functions, sqlalchemy_functions # noqa from .connection import SearchConnection from .status import get_status, StatusResult -from .lookup import get_detailed_place, get_simple_place +from .lookup import get_places, get_detailed_place from .reverse import ReverseGeocoder from .search import ForwardGeocoder, Phrase, PhraseType, make_query_analyzer from . import types as ntyp @@ -222,8 +222,7 @@ class NominatimAPIAsync: conn.set_query_timeout(self.query_timeout) if details.keywords: await make_query_analyzer(conn) - return SearchResults(filter(None, - [await get_simple_place(conn, p, details) for p in places])) + return await get_places(conn, places, details) async def reverse(self, coord: ntyp.AnyPoint, **params: Any) -> Optional[ReverseResult]: """ Find a place by its coordinates. Also known as reverse geocoding. diff --git a/src/nominatim_api/lookup.py b/src/nominatim_api/lookup.py index 89a0258e..618e0a09 100644 --- a/src/nominatim_api/lookup.py +++ b/src/nominatim_api/lookup.py @@ -5,9 +5,10 @@ # Copyright (C) 2024 by the Nominatim developer community. # For a full list of authors see the git log. """ -Implementation of place lookup by ID. +Implementation of place lookup by ID (doing many places at once). """ -from typing import Optional, Callable, Tuple, Type +from typing import Optional, Callable, Type, Iterable, Tuple, Union +from dataclasses import dataclass import datetime as dt import sqlalchemy as sa @@ -20,13 +21,173 @@ from . import results as nres RowFunc = Callable[[Optional[SaRow], Type[nres.BaseResultT]], Optional[nres.BaseResultT]] -GeomFunc = Callable[[SaSelect, SaColumn], SaSelect] +GEOMETRY_TYPE_MAP = { + 'POINT': 'ST_Point', + 'MULTIPOINT': 'ST_MultiPoint', + 'LINESTRING': 'ST_LineString', + 'MULTILINESTRING': 'ST_MultiLineString', + 'POLYGON': 'ST_Polygon', + 'MULTIPOLYGON': 'ST_MultiPolygon', + 'GEOMETRYCOLLECTION': 'ST_GeometryCollection' +} -async def find_in_placex(conn: SearchConnection, place: ntyp.PlaceRef, - add_geometries: GeomFunc) -> Optional[SaRow]: - """ Search for the given place in the placex table and return the - base information. +@dataclass +class LookupTuple: + """ Data class saving the SQL result for a single lookup. + """ + pid: ntyp.PlaceRef + result: Optional[nres.SearchResult] = None + + +class LookupCollector: + """ Result collector for the simple lookup. + + Allows for lookup of multiple places simultaniously. + """ + + def __init__(self, places: Iterable[ntyp.PlaceRef], + details: ntyp.LookupDetails) -> None: + self.details = details + self.lookups = [LookupTuple(p) for p in places] + + def get_results(self) -> nres.SearchResults: + """ Return the list of results available. + """ + return nres.SearchResults(p.result for p in self.lookups if p.result is not None) + + async def add_rows_from_sql(self, conn: SearchConnection, sql: SaSelect, + col: SaColumn, row_func: RowFunc[nres.SearchResult]) -> bool: + if self.details.geometry_output: + if self.details.geometry_simplification > 0.0: + col = sa.func.ST_SimplifyPreserveTopology( + col, self.details.geometry_simplification) + + if self.details.geometry_output & ntyp.GeometryFormat.GEOJSON: + sql = sql.add_columns(sa.func.ST_AsGeoJSON(col, 7).label('geometry_geojson')) + if self.details.geometry_output & ntyp.GeometryFormat.TEXT: + sql = sql.add_columns(sa.func.ST_AsText(col).label('geometry_text')) + if self.details.geometry_output & ntyp.GeometryFormat.KML: + sql = sql.add_columns(sa.func.ST_AsKML(col, 7).label('geometry_kml')) + if self.details.geometry_output & ntyp.GeometryFormat.SVG: + sql = sql.add_columns(sa.func.ST_AsSVG(col, 0, 7).label('geometry_svg')) + + for row in await conn.execute(sql): + result = row_func(row, nres.SearchResult) + assert result is not None + if hasattr(row, 'bbox'): + result.bbox = ntyp.Bbox.from_wkb(row.bbox) + + if self.lookups[row._idx].result is None: + self.lookups[row._idx].result = result + + return all(p.result is not None for p in self.lookups) + + def enumerate_free_place_ids(self) -> Iterable[Tuple[int, ntyp.PlaceID]]: + return ((i, p.pid) for i, p in enumerate(self.lookups) + if p.result is None and isinstance(p.pid, ntyp.PlaceID)) + + def enumerate_free_osm_ids(self) -> Iterable[Tuple[int, ntyp.OsmID]]: + return ((i, p.pid) for i, p in enumerate(self.lookups) + if p.result is None and isinstance(p.pid, ntyp.OsmID)) + + +class DetailedCollector: + """ Result collector for detailed lookup. + + Only one place at the time may be looked up. + """ + + def __init__(self, place: ntyp.PlaceRef, with_geometry: bool) -> None: + self.with_geometry = with_geometry + self.place = place + self.result: Optional[nres.DetailedResult] = None + + async def add_rows_from_sql(self, conn: SearchConnection, sql: SaSelect, + col: SaColumn, row_func: RowFunc[nres.DetailedResult]) -> bool: + if self.with_geometry: + sql = sql.add_columns( + sa.func.ST_AsGeoJSON( + sa.case((sa.func.ST_NPoints(col) > 5000, + sa.func.ST_SimplifyPreserveTopology(col, 0.0001)), + else_=col), 7).label('geometry_geojson')) + else: + sql = sql.add_columns(sa.func.ST_GeometryType(col).label('geometry_type')) + + for row in await conn.execute(sql): + self.result = row_func(row, nres.DetailedResult) + assert self.result is not None + # add missing details + if 'type' in self.result.geometry: + self.result.geometry['type'] = \ + GEOMETRY_TYPE_MAP.get(self.result.geometry['type'], + self.result.geometry['type']) + indexed_date = getattr(row, 'indexed_date', None) + if indexed_date is not None: + self.result.indexed_date = indexed_date.replace(tzinfo=dt.timezone.utc) + + return True + + # Nothing found. + return False + + def enumerate_free_place_ids(self) -> Iterable[Tuple[int, ntyp.PlaceID]]: + if self.result is None and isinstance(self.place, ntyp.PlaceID): + return [(0, self.place)] + return [] + + def enumerate_free_osm_ids(self) -> Iterable[Tuple[int, ntyp.OsmID]]: + if self.result is None and isinstance(self.place, ntyp.OsmID): + return [(0, self.place)] + return [] + + +Collector = Union[LookupCollector, DetailedCollector] + + +async def get_detailed_place(conn: SearchConnection, place: ntyp.PlaceRef, + details: ntyp.LookupDetails) -> Optional[nres.DetailedResult]: + """ Retrieve a place with additional details from the database. + """ + log().function('get_detailed_place', place=place, details=details) + + if details.geometry_output and details.geometry_output != ntyp.GeometryFormat.GEOJSON: + raise ValueError("lookup only supports geojosn polygon output.") + + collector = DetailedCollector(place, + bool(details.geometry_output & ntyp.GeometryFormat.GEOJSON)) + + for func in (find_in_placex, find_in_osmline, find_in_postcode, find_in_tiger): + if await func(conn, collector): + break + + if collector.result is not None: + await nres.add_result_details(conn, [collector.result], details) + + return collector.result + + +async def get_places(conn: SearchConnection, places: Iterable[ntyp.PlaceRef], + details: ntyp.LookupDetails) -> nres.SearchResults: + """ Retrieve a list of places as simple search results from the + database. + """ + log().function('get_places', places=places, details=details) + + collector = LookupCollector(places, details) + + for func in (find_in_placex, find_in_osmline, find_in_postcode, find_in_tiger): + if await func(conn, collector): + break + + results = collector.get_results() + await nres.add_result_details(conn, results, details) + + return results + + +async def find_in_placex(conn: SearchConnection, collector: Collector) -> bool: + """ Search for the given places in the main placex table. """ log().section("Find in placex table") t = conn.t.placex @@ -40,26 +201,42 @@ async def find_in_placex(conn: SearchConnection, place: ntyp.PlaceRef, t.c.geometry.ST_Expand(0).label('bbox'), t.c.centroid) - if isinstance(place, ntyp.PlaceID): - sql = sql.where(t.c.place_id == place.place_id) - elif isinstance(place, ntyp.OsmID): - sql = sql.where(t.c.osm_type == place.osm_type)\ - .where(t.c.osm_id == place.osm_id) - if place.osm_class: - sql = sql.where(t.c.class_ == place.osm_class) - else: - sql = sql.order_by(t.c.class_) - sql = sql.limit(1) - else: - return None + osm_ids = [{'i': i, 'ot': p.osm_type, 'oi': p.osm_id, 'oc': p.osm_class or ''} + for i, p in collector.enumerate_free_osm_ids()] - return (await conn.execute(add_geometries(sql, t.c.geometry))).one_or_none() + if osm_ids: + oid_tab = sa.func.JsonArrayEach(sa.type_coerce(osm_ids, sa.JSON))\ + .table_valued(sa.column('value', type_=sa.JSON)) + psql = sql.add_columns(oid_tab.c.value['i'].as_integer().label('_idx'))\ + .where(t.c.osm_type == oid_tab.c.value['ot'].as_string())\ + .where(t.c.osm_id == oid_tab.c.value['oi'].as_string().cast(sa.BigInteger))\ + .where(sa.or_(oid_tab.c.value['oc'].as_string() == '', + oid_tab.c.value['oc'].as_string() == t.c.class_))\ + .order_by(t.c.class_) + + if await collector.add_rows_from_sql(conn, psql, t.c.geometry, + nres.create_from_placex_row): + return True + + place_ids = [{'i': i, 'id': p.place_id} + for i, p in collector.enumerate_free_place_ids()] + + if place_ids: + pid_tab = sa.func.JsonArrayEach(sa.type_coerce(place_ids, sa.JSON))\ + .table_valued(sa.column('value', type_=sa.JSON)) + psql = sql.add_columns(pid_tab.c.value['i'].as_integer().label('_idx'))\ + .where(t.c.place_id == pid_tab.c.value['id'].as_string().cast(sa.BigInteger)) + + return await collector.add_rows_from_sql(conn, psql, t.c.geometry, + nres.create_from_placex_row) + + return False -async def find_in_osmline(conn: SearchConnection, place: ntyp.PlaceRef, - add_geometries: GeomFunc) -> Optional[SaRow]: - """ Search for the given place in the osmline table and return the - base information. +async def find_in_osmline(conn: SearchConnection, collector: Collector) -> bool: + """ Search for the given places in the table for address interpolations. + + Return true when all places have been resolved. """ log().section("Find in interpolation table") t = conn.t.osmline @@ -68,182 +245,89 @@ async def find_in_osmline(conn: SearchConnection, place: ntyp.PlaceRef, t.c.step, t.c.address, t.c.postcode, t.c.country_code, t.c.linegeo.ST_Centroid().label('centroid')) - if isinstance(place, ntyp.PlaceID): - sql = sql.where(t.c.place_id == place.place_id) - elif isinstance(place, ntyp.OsmID) and place.osm_type == 'W': - # There may be multiple interpolations for a single way. - # If 'class' contains a number, return the one that belongs to that number. - sql = sql.where(t.c.osm_id == place.osm_id).limit(1) - if place.osm_class and place.osm_class.isdigit(): - sql = sql.order_by(sa.func.greatest(0, - int(place.osm_class) - t.c.endnumber, - t.c.startnumber - int(place.osm_class))) - else: - return None + osm_ids = [{'i': i, 'oi': p.osm_id, 'oc': p.class_as_housenumber()} + for i, p in collector.enumerate_free_osm_ids() if p.osm_type == 'W'] - return (await conn.execute(add_geometries(sql, t.c.linegeo))).one_or_none() + if osm_ids: + oid_tab = sa.func.JsonArrayEach(sa.type_coerce(osm_ids, sa.JSON))\ + .table_valued(sa.column('value', type_=sa.JSON)) + psql = sql.add_columns(oid_tab.c.value['i'].as_integer().label('_idx'))\ + .where(t.c.osm_id == oid_tab.c.value['oi'].as_string().cast(sa.BigInteger))\ + .order_by(sa.func.greatest(0, + oid_tab.c.value['oc'].as_integer() - t.c.endnumber, + t.c.startnumber - oid_tab.c.value['oc'].as_integer())) + + if await collector.add_rows_from_sql(conn, psql, t.c.linegeo, + nres.create_from_osmline_row): + return True + + place_ids = [{'i': i, 'id': p.place_id} + for i, p in collector.enumerate_free_place_ids()] + + if place_ids: + pid_tab = sa.func.JsonArrayEach(sa.type_coerce(place_ids, sa.JSON))\ + .table_valued(sa.column('value', type_=sa.JSON)) + psql = sql.add_columns(pid_tab.c.value['i'].label('_idx'))\ + .where(t.c.place_id == pid_tab.c.value['id'].as_string().cast(sa.BigInteger)) + + return await collector.add_rows_from_sql(conn, psql, t.c.linegeo, + nres.create_from_osmline_row) + + return False -async def find_in_tiger(conn: SearchConnection, place: ntyp.PlaceRef, - add_geometries: GeomFunc) -> Optional[SaRow]: - """ Search for the given place in the table of Tiger addresses and return - the base information. Only lookup by place ID is supported. +async def find_in_postcode(conn: SearchConnection, collector: Collector) -> bool: + """ Search for the given places in the postcode table. + + Return true when all places have been resolved. """ - if not isinstance(place, ntyp.PlaceID): - return None - - log().section("Find in TIGER table") - t = conn.t.tiger - parent = conn.t.placex - sql = sa.select(t.c.place_id, t.c.parent_place_id, - parent.c.osm_type, parent.c.osm_id, - t.c.startnumber, t.c.endnumber, t.c.step, - t.c.postcode, - t.c.linegeo.ST_Centroid().label('centroid'))\ - .where(t.c.place_id == place.place_id)\ - .join(parent, t.c.parent_place_id == parent.c.place_id, isouter=True) - - return (await conn.execute(add_geometries(sql, t.c.linegeo))).one_or_none() - - -async def find_in_postcode(conn: SearchConnection, place: ntyp.PlaceRef, - add_geometries: GeomFunc) -> Optional[SaRow]: - """ Search for the given place in the postcode table and return the - base information. Only lookup by place ID is supported. - """ - if not isinstance(place, ntyp.PlaceID): - return None - log().section("Find in postcode table") - t = conn.t.postcode - sql = sa.select(t.c.place_id, t.c.parent_place_id, - t.c.rank_search, t.c.rank_address, - t.c.indexed_date, t.c.postcode, t.c.country_code, - t.c.geometry.label('centroid')) \ - .where(t.c.place_id == place.place_id) - return (await conn.execute(add_geometries(sql, t.c.geometry))).one_or_none() + place_ids = [{'i': i, 'id': p.place_id} + for i, p in collector.enumerate_free_place_ids()] + + if place_ids: + pid_tab = sa.func.JsonArrayEach(sa.type_coerce(place_ids, sa.JSON))\ + .table_valued(sa.column('value', type_=sa.JSON)) + t = conn.t.postcode + sql = sa.select(pid_tab.c.value['i'].as_integer().label('_idx'), + t.c.place_id, t.c.parent_place_id, + t.c.rank_search, t.c.rank_address, + t.c.indexed_date, t.c.postcode, t.c.country_code, + t.c.geometry.label('centroid'))\ + .where(t.c.place_id == pid_tab.c.value['id'].as_string().cast(sa.BigInteger)) + + return await collector.add_rows_from_sql(conn, sql, t.c.geometry, + nres.create_from_postcode_row) + + return False -async def find_in_all_tables(conn: SearchConnection, place: ntyp.PlaceRef, - add_geometries: GeomFunc - ) -> Tuple[Optional[SaRow], RowFunc[nres.BaseResultT]]: - """ Search for the given place in all data tables - and return the base information. +async def find_in_tiger(conn: SearchConnection, collector: Collector) -> bool: + """ Search for the given places in the TIGER address table. + + Return true when all places have been resolved. """ - row = await find_in_placex(conn, place, add_geometries) - log().var_dump('Result (placex)', row) - if row is not None: - return row, nres.create_from_placex_row + log().section("Find in tiger table") - row = await find_in_osmline(conn, place, add_geometries) - log().var_dump('Result (osmline)', row) - if row is not None: - return row, nres.create_from_osmline_row + place_ids = [{'i': i, 'id': p.place_id} + for i, p in collector.enumerate_free_place_ids()] - row = await find_in_postcode(conn, place, add_geometries) - log().var_dump('Result (postcode)', row) - if row is not None: - return row, nres.create_from_postcode_row + if place_ids: + pid_tab = sa.func.JsonArrayEach(sa.type_coerce(place_ids, sa.JSON))\ + .table_valued(sa.column('value', type_=sa.JSON)) + t = conn.t.tiger + parent = conn.t.placex + sql = sa.select(pid_tab.c.value['i'].as_integer().label('_idx'), + t.c.place_id, t.c.parent_place_id, + parent.c.osm_type, parent.c.osm_id, + t.c.startnumber, t.c.endnumber, t.c.step, + t.c.postcode, + t.c.linegeo.ST_Centroid().label('centroid'))\ + .join(parent, t.c.parent_place_id == parent.c.place_id, isouter=True)\ + .where(t.c.place_id == pid_tab.c.value['id'].as_string().cast(sa.BigInteger)) - row = await find_in_tiger(conn, place, add_geometries) - log().var_dump('Result (tiger)', row) - return row, nres.create_from_tiger_row + return await collector.add_rows_from_sql(conn, sql, t.c.linegeo, + nres.create_from_tiger_row) - -async def get_detailed_place(conn: SearchConnection, place: ntyp.PlaceRef, - details: ntyp.LookupDetails) -> Optional[nres.DetailedResult]: - """ Retrieve a place with additional details from the database. - """ - log().function('get_detailed_place', place=place, details=details) - - if details.geometry_output and details.geometry_output != ntyp.GeometryFormat.GEOJSON: - raise ValueError("lookup only supports geojosn polygon output.") - - if details.geometry_output & ntyp.GeometryFormat.GEOJSON: - def _add_geometry(sql: SaSelect, column: SaColumn) -> SaSelect: - return sql.add_columns(sa.func.ST_AsGeoJSON( - sa.case((sa.func.ST_NPoints(column) > 5000, - sa.func.ST_SimplifyPreserveTopology(column, 0.0001)), - else_=column), 7).label('geometry_geojson')) - else: - def _add_geometry(sql: SaSelect, column: SaColumn) -> SaSelect: - return sql.add_columns(sa.func.ST_GeometryType(column).label('geometry_type')) - - row_func: RowFunc[nres.DetailedResult] - row, row_func = await find_in_all_tables(conn, place, _add_geometry) - - if row is None: - return None - - result = row_func(row, nres.DetailedResult) - assert result is not None - - # add missing details - assert result is not None - if 'type' in result.geometry: - result.geometry['type'] = GEOMETRY_TYPE_MAP.get(result.geometry['type'], - result.geometry['type']) - indexed_date = getattr(row, 'indexed_date', None) - if indexed_date is not None: - result.indexed_date = indexed_date.replace(tzinfo=dt.timezone.utc) - - await nres.add_result_details(conn, [result], details) - - return result - - -async def get_simple_place(conn: SearchConnection, place: ntyp.PlaceRef, - details: ntyp.LookupDetails) -> Optional[nres.SearchResult]: - """ Retrieve a place as a simple search result from the database. - """ - log().function('get_simple_place', place=place, details=details) - - def _add_geometry(sql: SaSelect, col: SaColumn) -> SaSelect: - if not details.geometry_output: - return sql - - out = [] - - if details.geometry_simplification > 0.0: - col = sa.func.ST_SimplifyPreserveTopology(col, details.geometry_simplification) - - if details.geometry_output & ntyp.GeometryFormat.GEOJSON: - out.append(sa.func.ST_AsGeoJSON(col, 7).label('geometry_geojson')) - if details.geometry_output & ntyp.GeometryFormat.TEXT: - out.append(sa.func.ST_AsText(col).label('geometry_text')) - if details.geometry_output & ntyp.GeometryFormat.KML: - out.append(sa.func.ST_AsKML(col, 7).label('geometry_kml')) - if details.geometry_output & ntyp.GeometryFormat.SVG: - out.append(sa.func.ST_AsSVG(col, 0, 7).label('geometry_svg')) - - return sql.add_columns(*out) - - row_func: RowFunc[nres.SearchResult] - row, row_func = await find_in_all_tables(conn, place, _add_geometry) - - if row is None: - return None - - result = row_func(row, nres.SearchResult) - assert result is not None - - # add missing details - assert result is not None - if hasattr(row, 'bbox'): - result.bbox = ntyp.Bbox.from_wkb(row.bbox) - - await nres.add_result_details(conn, [result], details) - - return result - - -GEOMETRY_TYPE_MAP = { - 'POINT': 'ST_Point', - 'MULTIPOINT': 'ST_MultiPoint', - 'LINESTRING': 'ST_LineString', - 'MULTILINESTRING': 'ST_MultiLineString', - 'POLYGON': 'ST_Polygon', - 'MULTIPOLYGON': 'ST_MultiPolygon', - 'GEOMETRYCOLLECTION': 'ST_GeometryCollection' -} + return False diff --git a/src/nominatim_api/types.py b/src/nominatim_api/types.py index 5f3309d9..66a3c553 100644 --- a/src/nominatim_api/types.py +++ b/src/nominatim_api/types.py @@ -61,6 +61,17 @@ class OsmID: if self.osm_type not in ('N', 'W', 'R'): raise ValueError(f"Illegal OSM type '{self.osm_type}'. Must be one of N, W, R.") + def class_as_housenumber(self) -> Optional[int]: + """ Interpret the class property as a housenumber and return it. + + If the OSM ID points to an interpolation, then the class may be + a number pointing to the exact number requested. This function + returns the housenumber as an int, if class is set and is a number. + """ + if self.osm_class and self.osm_class.isdigit(): + return int(self.osm_class) + return None + PlaceRef = Union[PlaceID, OsmID]