make code work with Spatialite 4.3

Transfer is_address_point into SQLAlchemy function, so that
json.has_key() can use the older json_extract() function.
And work around broken Distance function.
This commit is contained in:
Sarah Hoffmann 2023-10-16 16:57:13 +02:00
parent d8dca2a3a9
commit 899a04ad26
3 changed files with 33 additions and 25 deletions

View File

@ -84,12 +84,6 @@ def _locate_interpolation(table: SaFromClause) -> SaLabel:
else_=0).label('position')
def _is_address_point(table: SaFromClause) -> SaColumn:
return sa.and_(table.c.rank_address == 30,
sa.or_(table.c.housenumber != None,
sa.func.JsonHasKey(table.c.name, 'addr:housename')))
def _get_closest(*rows: Optional[SaRow]) -> Optional[SaRow]:
return min(rows, key=lambda row: 1000 if row is None else row.distance)
@ -203,7 +197,7 @@ class ReverseGeocoder:
max_rank = min(29, self.max_rank)
restrict.append(lambda: no_index(t.c.rank_address).between(26, max_rank))
if self.max_rank == 30:
restrict.append(lambda: _is_address_point(t))
restrict.append(lambda: sa.func.IsAddressPoint(t))
if self.layer_enabled(DataLayer.POI) and self.max_rank == 30:
restrict.append(lambda: sa.and_(no_index(t.c.rank_search) == 30,
t.c.class_.not_in(('place', 'building')),
@ -227,7 +221,7 @@ class ReverseGeocoder:
sql: SaLambdaSelect = sa.lambda_stmt(lambda: _select_from_placex(t)
.where(t.c.geometry.ST_DWithin(WKT_PARAM, 0.001))
.where(t.c.parent_place_id == parent_place_id)
.where(_is_address_point(t))
.where(sa.func.IsAddressPoint(t))
.where(t.c.indexed_status == 0)
.where(t.c.linked_place_id == None)
.order_by('distance')

View File

@ -132,6 +132,36 @@ def select_index_placex_geometry_reverse_lookupplacenode(table: str) -> 'sa.Text
f" AND {table}.osm_type = 'N'")
class IsAddressPoint(sa.sql.functions.GenericFunction[bool]):
type = sa.Boolean()
name = 'IsAddressPoint'
inherit_cache = True
def __init__(self, table: sa.Table) -> None:
super().__init__(table.c.rank_address, # type: ignore[no-untyped-call]
table.c.housenumber, table.c.name)
@compiles(IsAddressPoint) # type: ignore[no-untyped-call, misc]
def default_is_address_point(element: SaColumn,
compiler: 'sa.Compiled', **kw: Any) -> str:
rank, hnr, name = list(element.clauses)
return "(%s = 30 AND (%s IS NOT NULL OR %s ? 'addr:housename'))" % (
compiler.process(rank, **kw),
compiler.process(hnr, **kw),
compiler.process(name, **kw))
@compiles(IsAddressPoint, 'sqlite') # type: ignore[no-untyped-call, misc]
def sqlite_is_address_point(element: SaColumn,
compiler: 'sa.Compiled', **kw: Any) -> str:
rank, hnr, name = list(element.clauses)
return "(%s = 30 AND coalesce(%s, json_extract(%s, '$.addr:housename')) IS NOT NULL)" % (
compiler.process(rank, **kw),
compiler.process(hnr, **kw),
compiler.process(name, **kw))
class CrosscheckNames(sa.sql.functions.GenericFunction[bool]):
""" Check if in the given list of names in parameters 1 any of the names
from the JSON array in parameter 2 are contained.
@ -175,22 +205,6 @@ def sqlite_json_array_each(element: SaColumn, compiler: 'sa.Compiled', **kw: Any
return "json_each(%s)" % compiler.process(element.clauses, **kw)
class JsonHasKey(sa.sql.functions.GenericFunction[bool]):
""" Return elements of a json array as a set.
"""
type = sa.Boolean()
name = 'JsonHasKey'
inherit_cache = True
@compiles(JsonHasKey) # type: ignore[no-untyped-call, misc]
def compile_json_has_key(element: SaColumn,
compiler: 'sa.Compiled', **kw: Any) -> str:
arg1, arg2 = list(element.clauses)
return "%s->%s is not null" % (compiler.process(arg1, **kw),
compiler.process(arg2, **kw))
class Greatest(sa.sql.functions.GenericFunction[Any]):
""" Function to compute maximum of all its input parameters.
"""

View File

@ -38,7 +38,7 @@ def _default_distance_spheroid(element: SaColumn,
@compiles(Geometry_DistanceSpheroid, 'sqlite') # type: ignore[no-untyped-call, misc]
def _spatialite_distance_spheroid(element: SaColumn,
compiler: 'sa.Compiled', **kw: Any) -> str:
return "Distance(%s, true)" % compiler.process(element.clauses, **kw)
return "COALESCE(Distance(%s, true), 0.0)" % compiler.process(element.clauses, **kw)
class Geometry_IsLineLike(sa.sql.expression.FunctionElement[bool]):