Nominatim/test/python/tools/test_postcodes.py
2022-07-06 11:08:36 +02:00

239 lines
9.0 KiB
Python

# SPDX-License-Identifier: GPL-2.0-only
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2022 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
Tests for functions to maintain the artificial postcode table.
"""
import subprocess
import pytest
from nominatim.tools import postcodes
from nominatim.data import country_info
import dummy_tokenizer
class MockPostcodeTable:
""" A location_postcode table for testing.
"""
def __init__(self, conn):
self.conn = conn
with conn.cursor() as cur:
cur.execute("""CREATE TABLE location_postcode (
place_id BIGINT,
parent_place_id BIGINT,
rank_search SMALLINT,
rank_address SMALLINT,
indexed_status SMALLINT,
indexed_date TIMESTAMP,
country_code varchar(2),
postcode TEXT,
geometry GEOMETRY(Geometry, 4326))""")
cur.execute("""CREATE OR REPLACE FUNCTION token_normalized_postcode(postcode TEXT)
RETURNS TEXT AS $$ BEGIN RETURN postcode; END; $$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION get_country_code(place geometry)
RETURNS TEXT AS $$ BEGIN
RETURN null;
END; $$ LANGUAGE plpgsql;
""")
conn.commit()
def add(self, country, postcode, x, y):
with self.conn.cursor() as cur:
cur.execute("""INSERT INTO location_postcode (place_id, indexed_status,
country_code, postcode,
geometry)
VALUES (nextval('seq_place'), 1, %s, %s,
'SRID=4326;POINT(%s %s)')""",
(country, postcode, x, y))
self.conn.commit()
@property
def row_set(self):
with self.conn.cursor() as cur:
cur.execute("""SELECT country_code, postcode,
ST_X(geometry), ST_Y(geometry)
FROM location_postcode""")
return set((tuple(row) for row in cur))
@pytest.fixture
def tokenizer():
return dummy_tokenizer.DummyTokenizer(None, None)
@pytest.fixture
def postcode_table(def_config, temp_db_conn, placex_table):
country_info.setup_country_config(def_config)
return MockPostcodeTable(temp_db_conn)
@pytest.fixture
def insert_implicit_postcode(placex_table, place_row):
"""
Inserts data into the placex and place table
which can then be used to compute one postcode.
"""
def _insert_implicit_postcode(osm_id, country, geometry, address):
placex_table.add(osm_id=osm_id, country=country, geom=geometry)
place_row(osm_id=osm_id, geom='SRID=4326;'+geometry, address=address)
return _insert_implicit_postcode
def test_postcodes_empty(dsn, postcode_table, place_table,
tmp_path, tokenizer):
postcodes.update_postcodes(dsn, tmp_path, tokenizer)
assert not postcode_table.row_set
def test_postcodes_add_new(dsn, postcode_table, tmp_path,
insert_implicit_postcode, tokenizer):
insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='9486'))
postcode_table.add('yy', '9486', 99, 34)
postcodes.update_postcodes(dsn, tmp_path, tokenizer)
assert postcode_table.row_set == {('xx', '9486', 10, 12), }
def test_postcodes_replace_coordinates(dsn, postcode_table, tmp_path,
insert_implicit_postcode, tokenizer):
insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
postcode_table.add('xx', 'AB 4511', 99, 34)
postcodes.update_postcodes(dsn, tmp_path, tokenizer)
assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)}
def test_postcodes_replace_coordinates_close(dsn, postcode_table, tmp_path,
insert_implicit_postcode, tokenizer):
insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
postcode_table.add('xx', 'AB 4511', 10, 11.99999)
postcodes.update_postcodes(dsn, tmp_path, tokenizer)
assert postcode_table.row_set == {('xx', 'AB 4511', 10, 11.99999)}
def test_postcodes_remove(dsn, postcode_table, tmp_path,
insert_implicit_postcode, tokenizer):
insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
postcode_table.add('xx', 'badname', 10, 12)
postcodes.update_postcodes(dsn, tmp_path, tokenizer)
assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)}
def test_postcodes_ignore_empty_country(dsn, postcode_table, tmp_path,
insert_implicit_postcode, tokenizer):
insert_implicit_postcode(1, None, 'POINT(10 12)', dict(postcode='AB 4511'))
postcodes.update_postcodes(dsn, tmp_path, tokenizer)
assert not postcode_table.row_set
def test_postcodes_remove_all(dsn, postcode_table, place_table,
tmp_path, tokenizer):
postcode_table.add('ch', '5613', 10, 12)
postcodes.update_postcodes(dsn, tmp_path, tokenizer)
assert not postcode_table.row_set
def test_postcodes_multi_country(dsn, postcode_table, tmp_path,
insert_implicit_postcode, tokenizer):
insert_implicit_postcode(1, 'de', 'POINT(10 12)', dict(postcode='54451'))
insert_implicit_postcode(2, 'cc', 'POINT(100 56)', dict(postcode='DD23 T'))
insert_implicit_postcode(3, 'de', 'POINT(10.3 11.0)', dict(postcode='54452'))
insert_implicit_postcode(4, 'cc', 'POINT(10.3 11.0)', dict(postcode='54452'))
postcodes.update_postcodes(dsn, tmp_path, tokenizer)
assert postcode_table.row_set == {('de', '54451', 10, 12),
('de', '54452', 10.3, 11.0),
('cc', '54452', 10.3, 11.0),
('cc', 'DD23 T', 100, 56)}
@pytest.mark.parametrize("gzipped", [True, False])
def test_postcodes_extern(dsn, postcode_table, tmp_path,
insert_implicit_postcode, tokenizer, gzipped):
insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
extfile = tmp_path / 'xx_postcodes.csv'
extfile.write_text("postcode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10")
if gzipped:
subprocess.run(['gzip', str(extfile)])
assert not extfile.is_file()
postcodes.update_postcodes(dsn, tmp_path, tokenizer)
assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12),
('xx', 'CD 4511', -10, -5)}
def test_postcodes_extern_bad_column(dsn, postcode_table, tmp_path,
insert_implicit_postcode, tokenizer):
insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
extfile = tmp_path / 'xx_postcodes.csv'
extfile.write_text("postode,lat,lon\nAB 4511,-4,-1\nCD 4511,-5, -10")
postcodes.update_postcodes(dsn, tmp_path, tokenizer)
assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12)}
def test_postcodes_extern_bad_number(dsn, insert_implicit_postcode,
postcode_table, tmp_path, tokenizer):
insert_implicit_postcode(1, 'xx', 'POINT(10 12)', dict(postcode='AB 4511'))
extfile = tmp_path / 'xx_postcodes.csv'
extfile.write_text("postcode,lat,lon\nXX 4511,-4,NaN\nCD 4511,-5, -10\n34,200,0")
postcodes.update_postcodes(dsn, tmp_path, tokenizer)
assert postcode_table.row_set == {('xx', 'AB 4511', 10, 12),
('xx', 'CD 4511', -10, -5)}
def test_can_compute(dsn, table_factory):
assert not postcodes.can_compute(dsn)
table_factory('place')
assert postcodes.can_compute(dsn)
def test_no_placex_entry(dsn, tmp_path, temp_db_cursor, place_row, postcode_table, tokenizer):
#Rewrite the get_country_code function to verify its execution.
temp_db_cursor.execute("""
CREATE OR REPLACE FUNCTION get_country_code(place geometry)
RETURNS TEXT AS $$ BEGIN
RETURN 'yy';
END; $$ LANGUAGE plpgsql;
""")
place_row(geom='SRID=4326;POINT(10 12)', address=dict(postcode='AB 4511'))
postcodes.update_postcodes(dsn, tmp_path, tokenizer)
assert postcode_table.row_set == {('yy', 'AB 4511', 10, 12)}
def test_discard_badly_formatted_postcodes(dsn, tmp_path, temp_db_cursor, place_row, postcode_table, tokenizer):
#Rewrite the get_country_code function to verify its execution.
temp_db_cursor.execute("""
CREATE OR REPLACE FUNCTION get_country_code(place geometry)
RETURNS TEXT AS $$ BEGIN
RETURN 'fr';
END; $$ LANGUAGE plpgsql;
""")
place_row(geom='SRID=4326;POINT(10 12)', address=dict(postcode='AB 4511'))
postcodes.update_postcodes(dsn, tmp_path, tokenizer)
assert not postcode_table.row_set