mirror of
https://github.com/osm-search/Nominatim.git
synced 2024-11-27 00:49:55 +03:00
test: replace raw execute() with fixture code where possible
This commit is contained in:
parent
65bd749918
commit
c06a1d007a
@ -19,7 +19,7 @@ import nominatim.tokenizer.factory
|
||||
|
||||
import dummy_tokenizer
|
||||
import mocks
|
||||
from cursor import TestingCursor
|
||||
from cursor import CursorForTesting
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@ -80,7 +80,7 @@ def temp_db_cursor(temp_db):
|
||||
"""
|
||||
conn = psycopg2.connect('dbname=' + temp_db)
|
||||
conn.set_isolation_level(0)
|
||||
with conn.cursor(cursor_factory=TestingCursor) as cur:
|
||||
with conn.cursor(cursor_factory=CursorForTesting) as cur:
|
||||
yield cur
|
||||
conn.close()
|
||||
|
||||
|
@ -3,7 +3,7 @@ Specialised psycopg2 cursor with shortcut functions useful for testing.
|
||||
"""
|
||||
import psycopg2.extras
|
||||
|
||||
class TestingCursor(psycopg2.extras.DictCursor):
|
||||
class CursorForTesting(psycopg2.extras.DictCursor):
|
||||
""" Extension to the DictCursor class that provides execution
|
||||
short-cuts that simplify writing assertions.
|
||||
"""
|
||||
|
@ -244,9 +244,9 @@ def test_add_data_command(mock_run_legacy, name, oid):
|
||||
(['--boundaries-only'], 1, 0),
|
||||
(['--no-boundaries'], 0, 1),
|
||||
(['--boundaries-only', '--no-boundaries'], 0, 0)])
|
||||
def test_index_command(mock_func_factory, temp_db_cursor, tokenizer_mock,
|
||||
def test_index_command(mock_func_factory, table_factory, tokenizer_mock,
|
||||
params, do_bnds, do_ranks):
|
||||
temp_db_cursor.execute("CREATE TABLE import_status (indexed bool)")
|
||||
table_factory('import_status', 'indexed bool')
|
||||
bnd_mock = mock_func_factory(nominatim.indexer.indexer.Indexer, 'index_boundaries')
|
||||
rank_mock = mock_func_factory(nominatim.indexer.indexer.Indexer, 'index_by_rank')
|
||||
|
||||
|
@ -20,10 +20,10 @@ def test_connection_table_exists(db, table_factory):
|
||||
assert db.table_exists('foobar') == True
|
||||
|
||||
|
||||
def test_connection_index_exists(db, temp_db_cursor):
|
||||
def test_connection_index_exists(db, table_factory, temp_db_cursor):
|
||||
assert db.index_exists('some_index') == False
|
||||
|
||||
temp_db_cursor.execute('CREATE TABLE foobar (id INT)')
|
||||
table_factory('foobar')
|
||||
temp_db_cursor.execute('CREATE INDEX some_index ON foobar(id)')
|
||||
|
||||
assert db.index_exists('some_index') == True
|
||||
@ -55,9 +55,7 @@ def test_connection_server_version_tuple(db):
|
||||
assert ver[0] > 8
|
||||
|
||||
|
||||
def test_connection_postgis_version_tuple(db, temp_db_cursor):
|
||||
temp_db_cursor.execute('CREATE EXTENSION postgis')
|
||||
|
||||
def test_connection_postgis_version_tuple(db, temp_db_with_extensions):
|
||||
ver = db.postgis_version_tuple()
|
||||
|
||||
assert isinstance(ver, tuple)
|
||||
|
@ -6,26 +6,32 @@ import pytest
|
||||
from nominatim.db import properties
|
||||
|
||||
@pytest.fixture
|
||||
def prop_table(table_factory):
|
||||
table_factory('nominatim_properties', 'property TEXT, value TEXT')
|
||||
def property_factory(property_table, temp_db_cursor):
|
||||
""" A function fixture that adds a property into the property table.
|
||||
"""
|
||||
def _add_property(name, value):
|
||||
temp_db_cursor.execute("INSERT INTO nominatim_properties VALUES(%s, %s)",
|
||||
(name, value))
|
||||
|
||||
return _add_property
|
||||
|
||||
|
||||
def test_get_property_existing(prop_table, temp_db_conn, temp_db_cursor):
|
||||
temp_db_cursor.execute("INSERT INTO nominatim_properties VALUES('foo', 'bar')")
|
||||
def test_get_property_existing(property_factory, temp_db_conn):
|
||||
property_factory('foo', 'bar')
|
||||
|
||||
assert properties.get_property(temp_db_conn, 'foo') == 'bar'
|
||||
|
||||
|
||||
def test_get_property_unknown(prop_table, temp_db_conn, temp_db_cursor):
|
||||
temp_db_cursor.execute("INSERT INTO nominatim_properties VALUES('other', 'bar')")
|
||||
def test_get_property_unknown(property_factory, temp_db_conn):
|
||||
property_factory('other', 'bar')
|
||||
|
||||
assert properties.get_property(temp_db_conn, 'foo') is None
|
||||
|
||||
|
||||
@pytest.mark.parametrize("prefill", (True, False))
|
||||
def test_set_property_new(prop_table, temp_db_conn, temp_db_cursor, prefill):
|
||||
def test_set_property_new(property_factory, temp_db_conn, temp_db_cursor, prefill):
|
||||
if prefill:
|
||||
temp_db_cursor.execute("INSERT INTO nominatim_properties VALUES('something', 'bar')")
|
||||
property_factory('something', 'bar')
|
||||
|
||||
properties.set_property(temp_db_conn, 'something', 'else')
|
||||
|
||||
|
@ -58,10 +58,8 @@ def test_set_status_empty_table(status_table, temp_db_conn, temp_db_cursor):
|
||||
date = dt.datetime.fromordinal(1000000).replace(tzinfo=dt.timezone.utc)
|
||||
nominatim.db.status.set_status(temp_db_conn, date=date)
|
||||
|
||||
temp_db_cursor.execute("SELECT * FROM import_status")
|
||||
|
||||
assert temp_db_cursor.rowcount == 1
|
||||
assert temp_db_cursor.fetchone() == [date, None, True]
|
||||
assert temp_db_cursor.row_set("SELECT * FROM import_status") == \
|
||||
{(date, None, True)}
|
||||
|
||||
|
||||
def test_set_status_filled_table(status_table, temp_db_conn, temp_db_cursor):
|
||||
@ -73,10 +71,8 @@ def test_set_status_filled_table(status_table, temp_db_conn, temp_db_cursor):
|
||||
date = dt.datetime.fromordinal(1000100).replace(tzinfo=dt.timezone.utc)
|
||||
nominatim.db.status.set_status(temp_db_conn, date=date, seq=456, indexed=False)
|
||||
|
||||
temp_db_cursor.execute("SELECT * FROM import_status")
|
||||
|
||||
assert temp_db_cursor.rowcount == 1
|
||||
assert temp_db_cursor.fetchone() == [date, 456, False]
|
||||
assert temp_db_cursor.row_set("SELECT * FROM import_status") == \
|
||||
{(date, 456, False)}
|
||||
|
||||
|
||||
def test_set_status_missing_date(status_table, temp_db_conn, temp_db_cursor):
|
||||
@ -87,10 +83,8 @@ def test_set_status_missing_date(status_table, temp_db_conn, temp_db_cursor):
|
||||
|
||||
nominatim.db.status.set_status(temp_db_conn, date=None, seq=456, indexed=False)
|
||||
|
||||
temp_db_cursor.execute("SELECT * FROM import_status")
|
||||
|
||||
assert temp_db_cursor.rowcount == 1
|
||||
assert temp_db_cursor.fetchone() == [date, 456, False]
|
||||
assert temp_db_cursor.row_set("SELECT * FROM import_status") == \
|
||||
{(date, 456, False)}
|
||||
|
||||
|
||||
def test_get_status_empty_table(status_table, temp_db_conn):
|
||||
|
@ -13,10 +13,7 @@ def test_execute_file_success(dsn, temp_db_cursor, tmp_path):
|
||||
|
||||
db_utils.execute_file(dsn, tmpfile)
|
||||
|
||||
temp_db_cursor.execute('SELECT * FROM test')
|
||||
|
||||
assert temp_db_cursor.rowcount == 1
|
||||
assert temp_db_cursor.fetchone()[0] == 56
|
||||
assert temp_db_cursor.row_set('SELECT * FROM test') == {(56, )}
|
||||
|
||||
def test_execute_file_bad_file(dsn, tmp_path):
|
||||
with pytest.raises(FileNotFoundError):
|
||||
@ -44,10 +41,7 @@ def test_execute_file_with_pre_code(dsn, tmp_path, temp_db_cursor):
|
||||
|
||||
db_utils.execute_file(dsn, tmpfile, pre_code='CREATE TABLE test (id INT)')
|
||||
|
||||
temp_db_cursor.execute('SELECT * FROM test')
|
||||
|
||||
assert temp_db_cursor.rowcount == 1
|
||||
assert temp_db_cursor.fetchone()[0] == 4
|
||||
assert temp_db_cursor.row_set('SELECT * FROM test') == {(4, )}
|
||||
|
||||
|
||||
def test_execute_file_with_post_code(dsn, tmp_path, temp_db_cursor):
|
||||
@ -56,7 +50,4 @@ def test_execute_file_with_post_code(dsn, tmp_path, temp_db_cursor):
|
||||
|
||||
db_utils.execute_file(dsn, tmpfile, post_code='INSERT INTO test VALUES(23)')
|
||||
|
||||
temp_db_cursor.execute('SELECT * FROM test')
|
||||
|
||||
assert temp_db_cursor.rowcount == 1
|
||||
assert temp_db_cursor.fetchone()[0] == 23
|
||||
assert temp_db_cursor.row_set('SELECT * FROM test') == {(23, )}
|
||||
|
@ -243,9 +243,8 @@ def test_update_special_phrase_delete_all(analyzer, word_table, temp_db_cursor,
|
||||
|
||||
def test_update_special_phrases_no_replace(analyzer, word_table, temp_db_cursor,
|
||||
make_standard_name):
|
||||
temp_db_cursor.execute("""INSERT INTO word (word_token, word, class, type, operator)
|
||||
VALUES (' foo', 'foo', 'amenity', 'prison', 'in'),
|
||||
(' bar', 'bar', 'highway', 'road', null)""")
|
||||
word_table.add_special(' foo', 'foo', 'amenity', 'prison', 'in')
|
||||
word_table.add_special(' bar', 'bar', 'highway', 'road', None)
|
||||
|
||||
assert word_table.count_special() == 2
|
||||
|
||||
|
@ -180,10 +180,9 @@ def test_update_special_phrase_empty_table(analyzer, word_table, temp_db_cursor)
|
||||
(' ST', 'street', 'highway', 'primary', 'in')))
|
||||
|
||||
|
||||
def test_update_special_phrase_delete_all(analyzer, word_table, temp_db_cursor):
|
||||
temp_db_cursor.execute("""INSERT INTO word (word_token, word, class, type, operator)
|
||||
VALUES (' FOO', 'foo', 'amenity', 'prison', 'in'),
|
||||
(' BAR', 'bar', 'highway', 'road', null)""")
|
||||
def test_update_special_phrase_delete_all(analyzer, word_table):
|
||||
word_table.add_special(' FOO', 'foo', 'amenity', 'prison', 'in')
|
||||
word_table.add_special(' BAR', 'bar', 'highway', 'road', None)
|
||||
|
||||
assert word_table.count_special() == 2
|
||||
|
||||
@ -193,10 +192,9 @@ def test_update_special_phrase_delete_all(analyzer, word_table, temp_db_cursor):
|
||||
assert word_table.count_special() == 0
|
||||
|
||||
|
||||
def test_update_special_phrases_no_replace(analyzer, word_table, temp_db_cursor,):
|
||||
temp_db_cursor.execute("""INSERT INTO word (word_token, word, class, type, operator)
|
||||
VALUES (' FOO', 'foo', 'amenity', 'prison', 'in'),
|
||||
(' BAR', 'bar', 'highway', 'road', null)""")
|
||||
def test_update_special_phrases_no_replace(analyzer, word_table):
|
||||
word_table.add_special(' FOO', 'foo', 'amenity', 'prison', 'in')
|
||||
word_table.add_special(' BAR', 'bar', 'highway', 'road', None)
|
||||
|
||||
assert word_table.count_special() == 2
|
||||
|
||||
@ -206,10 +204,9 @@ def test_update_special_phrases_no_replace(analyzer, word_table, temp_db_cursor,
|
||||
assert word_table.count_special() == 2
|
||||
|
||||
|
||||
def test_update_special_phrase_modify(analyzer, word_table, temp_db_cursor):
|
||||
temp_db_cursor.execute("""INSERT INTO word (word_token, word, class, type, operator)
|
||||
VALUES (' FOO', 'foo', 'amenity', 'prison', 'in'),
|
||||
(' BAR', 'bar', 'highway', 'road', null)""")
|
||||
def test_update_special_phrase_modify(analyzer, word_table):
|
||||
word_table.add_special(' FOO', 'foo', 'amenity', 'prison', 'in')
|
||||
word_table.add_special(' BAR', 'bar', 'highway', 'road', None)
|
||||
|
||||
assert word_table.count_special() == 2
|
||||
|
||||
|
@ -3,39 +3,40 @@ Tests for maintenance and analysis functions.
|
||||
"""
|
||||
import pytest
|
||||
|
||||
from nominatim.db.connection import connect
|
||||
from nominatim.errors import UsageError
|
||||
from nominatim.tools import admin
|
||||
|
||||
@pytest.fixture
|
||||
def db(temp_db, placex_table):
|
||||
with connect('dbname=' + temp_db) as conn:
|
||||
yield conn
|
||||
@pytest.fixture(autouse=True)
|
||||
def create_placex_table(placex_table):
|
||||
""" All tests in this module require the placex table to be set up.
|
||||
"""
|
||||
pass
|
||||
|
||||
def test_analyse_indexing_no_objects(db):
|
||||
|
||||
def test_analyse_indexing_no_objects(temp_db_conn):
|
||||
with pytest.raises(UsageError):
|
||||
admin.analyse_indexing(db)
|
||||
admin.analyse_indexing(temp_db_conn)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("oid", ['1234', 'N123a', 'X123'])
|
||||
def test_analyse_indexing_bad_osmid(db, oid):
|
||||
def test_analyse_indexing_bad_osmid(temp_db_conn, oid):
|
||||
with pytest.raises(UsageError):
|
||||
admin.analyse_indexing(db, osm_id=oid)
|
||||
admin.analyse_indexing(temp_db_conn, osm_id=oid)
|
||||
|
||||
|
||||
def test_analyse_indexing_unknown_osmid(db):
|
||||
def test_analyse_indexing_unknown_osmid(temp_db_conn):
|
||||
with pytest.raises(UsageError):
|
||||
admin.analyse_indexing(db, osm_id='W12345674')
|
||||
admin.analyse_indexing(temp_db_conn, osm_id='W12345674')
|
||||
|
||||
|
||||
def test_analyse_indexing_with_place_id(db, temp_db_cursor):
|
||||
def test_analyse_indexing_with_place_id(temp_db_conn, temp_db_cursor):
|
||||
temp_db_cursor.execute("INSERT INTO placex (place_id) VALUES(12345)")
|
||||
|
||||
admin.analyse_indexing(db, place_id=12345)
|
||||
admin.analyse_indexing(temp_db_conn, place_id=12345)
|
||||
|
||||
|
||||
def test_analyse_indexing_with_osm_id(db, temp_db_cursor):
|
||||
def test_analyse_indexing_with_osm_id(temp_db_conn, temp_db_cursor):
|
||||
temp_db_cursor.execute("""INSERT INTO placex (place_id, osm_type, osm_id)
|
||||
VALUES(9988, 'N', 10000)""")
|
||||
|
||||
admin.analyse_indexing(db, osm_id='N10000')
|
||||
admin.analyse_indexing(temp_db_conn, osm_id='N10000')
|
||||
|
@ -23,8 +23,8 @@ def test_check_conection_bad(def_config):
|
||||
assert chkdb.check_connection(badconn, def_config) == chkdb.CheckState.FATAL
|
||||
|
||||
|
||||
def test_check_placex_table_good(temp_db_cursor, temp_db_conn, def_config):
|
||||
temp_db_cursor.execute('CREATE TABLE placex (place_id int)')
|
||||
def test_check_placex_table_good(table_factory, temp_db_conn, def_config):
|
||||
table_factory('placex')
|
||||
assert chkdb.check_placex_table(temp_db_conn, def_config) == chkdb.CheckState.OK
|
||||
|
||||
|
||||
@ -32,14 +32,13 @@ def test_check_placex_table_bad(temp_db_conn, def_config):
|
||||
assert chkdb.check_placex_table(temp_db_conn, def_config) == chkdb.CheckState.FATAL
|
||||
|
||||
|
||||
def test_check_placex_table_size_good(temp_db_cursor, temp_db_conn, def_config):
|
||||
temp_db_cursor.execute('CREATE TABLE placex (place_id int)')
|
||||
temp_db_cursor.execute('INSERT INTO placex VALUES (1), (2)')
|
||||
def test_check_placex_table_size_good(table_factory, temp_db_conn, def_config):
|
||||
table_factory('placex', content=((1, ), (2, )))
|
||||
assert chkdb.check_placex_size(temp_db_conn, def_config) == chkdb.CheckState.OK
|
||||
|
||||
|
||||
def test_check_placex_table_size_bad(temp_db_cursor, temp_db_conn, def_config):
|
||||
temp_db_cursor.execute('CREATE TABLE placex (place_id int)')
|
||||
def test_check_placex_table_size_bad(table_factory, temp_db_conn, def_config):
|
||||
table_factory('placex')
|
||||
assert chkdb.check_placex_size(temp_db_conn, def_config) == chkdb.CheckState.FATAL
|
||||
|
||||
|
||||
@ -61,15 +60,15 @@ def test_check_tokenizer(tokenizer_mock, temp_db_conn, def_config, monkeypatch,
|
||||
assert chkdb.check_tokenizer(temp_db_conn, def_config) == state
|
||||
|
||||
|
||||
def test_check_indexing_good(temp_db_cursor, temp_db_conn, def_config):
|
||||
temp_db_cursor.execute('CREATE TABLE placex (place_id int, indexed_status smallint)')
|
||||
temp_db_cursor.execute('INSERT INTO placex VALUES (1, 0), (2, 0)')
|
||||
def test_check_indexing_good(table_factory, temp_db_conn, def_config):
|
||||
table_factory('placex', 'place_id int, indexed_status smallint',
|
||||
content=((1, 0), (2, 0)))
|
||||
assert chkdb.check_indexing(temp_db_conn, def_config) == chkdb.CheckState.OK
|
||||
|
||||
|
||||
def test_check_indexing_bad(temp_db_cursor, temp_db_conn, def_config):
|
||||
temp_db_cursor.execute('CREATE TABLE placex (place_id int, indexed_status smallint)')
|
||||
temp_db_cursor.execute('INSERT INTO placex VALUES (1, 0), (2, 2)')
|
||||
def test_check_indexing_bad(table_factory, temp_db_conn, def_config):
|
||||
table_factory('placex', 'place_id int, indexed_status smallint',
|
||||
content=((1, 0), (2, 2)))
|
||||
assert chkdb.check_indexing(temp_db_conn, def_config) == chkdb.CheckState.FAIL
|
||||
|
||||
|
||||
|
@ -67,10 +67,11 @@ def test_create_db_missing_ro_user(nonexistant_db):
|
||||
database_import.create_db('dbname=' + nonexistant_db, rouser='sdfwkjkjgdugu2;jgsafkljas;')
|
||||
|
||||
|
||||
def test_setup_extensions(temp_db_conn, temp_db_cursor):
|
||||
def test_setup_extensions(temp_db_conn, table_factory):
|
||||
database_import.setup_extensions(temp_db_conn)
|
||||
|
||||
temp_db_cursor.execute('CREATE TABLE t (h HSTORE, geom GEOMETRY(Geometry, 4326))')
|
||||
# Use table creation to check that hstore and geometry types are available.
|
||||
table_factory('t', 'h HSTORE, geom GEOMETRY(Geometry, 4326)')
|
||||
|
||||
|
||||
def test_setup_extensions_old_postgis(temp_db_conn, monkeypatch):
|
||||
@ -80,42 +81,36 @@ def test_setup_extensions_old_postgis(temp_db_conn, monkeypatch):
|
||||
database_import.setup_extensions(temp_db_conn)
|
||||
|
||||
|
||||
def test_import_base_data(src_dir, temp_db, temp_db_cursor):
|
||||
temp_db_cursor.execute('CREATE EXTENSION hstore')
|
||||
temp_db_cursor.execute('CREATE EXTENSION postgis')
|
||||
database_import.import_base_data('dbname=' + temp_db, src_dir / 'data')
|
||||
def test_import_base_data(dsn, src_dir, temp_db_with_extensions, temp_db_cursor):
|
||||
database_import.import_base_data(dsn, src_dir / 'data')
|
||||
|
||||
assert temp_db_cursor.table_rows('country_name') > 0
|
||||
|
||||
|
||||
def test_import_base_data_ignore_partitions(src_dir, temp_db, temp_db_cursor):
|
||||
temp_db_cursor.execute('CREATE EXTENSION hstore')
|
||||
temp_db_cursor.execute('CREATE EXTENSION postgis')
|
||||
database_import.import_base_data('dbname=' + temp_db, src_dir / 'data',
|
||||
ignore_partitions=True)
|
||||
def test_import_base_data_ignore_partitions(dsn, src_dir, temp_db_with_extensions,
|
||||
temp_db_cursor):
|
||||
database_import.import_base_data(dsn, src_dir / 'data', ignore_partitions=True)
|
||||
|
||||
assert temp_db_cursor.table_rows('country_name') > 0
|
||||
assert temp_db_cursor.table_rows('country_name', where='partition != 0') == 0
|
||||
|
||||
|
||||
def test_import_osm_data_simple(temp_db_cursor,osm2pgsql_options):
|
||||
temp_db_cursor.execute('CREATE TABLE place (id INT)')
|
||||
temp_db_cursor.execute('INSERT INTO place values (1)')
|
||||
def test_import_osm_data_simple(table_factory, osm2pgsql_options):
|
||||
table_factory('place', content=((1, ), ))
|
||||
|
||||
database_import.import_osm_data('file.pdf', osm2pgsql_options)
|
||||
|
||||
|
||||
def test_import_osm_data_simple_no_data(temp_db_cursor,osm2pgsql_options):
|
||||
temp_db_cursor.execute('CREATE TABLE place (id INT)')
|
||||
def test_import_osm_data_simple_no_data(table_factory, osm2pgsql_options):
|
||||
table_factory('place')
|
||||
|
||||
with pytest.raises(UsageError, match='No data.*'):
|
||||
database_import.import_osm_data('file.pdf', osm2pgsql_options)
|
||||
|
||||
|
||||
def test_import_osm_data_drop(temp_db_conn, temp_db_cursor, tmp_path, osm2pgsql_options):
|
||||
temp_db_cursor.execute('CREATE TABLE place (id INT)')
|
||||
temp_db_cursor.execute('CREATE TABLE planet_osm_nodes (id INT)')
|
||||
temp_db_cursor.execute('INSERT INTO place values (1)')
|
||||
def test_import_osm_data_drop(table_factory, temp_db_conn, tmp_path, osm2pgsql_options):
|
||||
table_factory('place', content=((1, ), ))
|
||||
table_factory('planet_osm_nodes')
|
||||
|
||||
flatfile = tmp_path / 'flatfile'
|
||||
flatfile.write_text('touch')
|
||||
@ -128,9 +123,8 @@ def test_import_osm_data_drop(temp_db_conn, temp_db_cursor, tmp_path, osm2pgsql_
|
||||
assert not temp_db_conn.table_exists('planet_osm_nodes')
|
||||
|
||||
|
||||
def test_import_osm_data_default_cache(temp_db_cursor,osm2pgsql_options):
|
||||
temp_db_cursor.execute('CREATE TABLE place (id INT)')
|
||||
temp_db_cursor.execute('INSERT INTO place values (1)')
|
||||
def test_import_osm_data_default_cache(table_factory, osm2pgsql_options):
|
||||
table_factory('place', content=((1, ), ))
|
||||
|
||||
osm2pgsql_options['osm2pgsql_cache'] = 0
|
||||
|
||||
|
@ -22,9 +22,9 @@ NOMINATIM_DROP_TABLES = [
|
||||
'wikipedia_article', 'wikipedia_redirect'
|
||||
]
|
||||
|
||||
def test_drop_tables(temp_db_conn, temp_db_cursor):
|
||||
def test_drop_tables(temp_db_conn, temp_db_cursor, table_factory):
|
||||
for table in NOMINATIM_RUNTIME_TABLES + NOMINATIM_DROP_TABLES:
|
||||
temp_db_cursor.execute('CREATE TABLE {} (id int)'.format(table))
|
||||
table_factory(table)
|
||||
|
||||
freeze.drop_update_tables(temp_db_conn)
|
||||
|
||||
|
@ -12,14 +12,16 @@ from nominatim.tools.special_phrases.sp_wiki_loader import SPWikiLoader
|
||||
from nominatim.tools.special_phrases.sp_csv_loader import SPCsvLoader
|
||||
from nominatim.tools.special_phrases.special_phrase import SpecialPhrase
|
||||
|
||||
from cursor import CursorForTesting
|
||||
|
||||
TEST_BASE_DIR = Path(__file__) / '..' / '..'
|
||||
|
||||
def test_fetch_existing_place_classtype_tables(sp_importer, temp_db_cursor):
|
||||
def test_fetch_existing_place_classtype_tables(sp_importer, table_factory):
|
||||
"""
|
||||
Check for the fetch_existing_place_classtype_tables() method.
|
||||
It should return the table just created.
|
||||
"""
|
||||
temp_db_cursor.execute('CREATE TABLE place_classtype_testclasstypetable()')
|
||||
table_factory('place_classtype_testclasstypetable')
|
||||
|
||||
sp_importer._fetch_existing_place_classtype_tables()
|
||||
contained_table = sp_importer.table_phrases_to_delete.pop()
|
||||
@ -91,7 +93,8 @@ def test_convert_settings_giving_json(sp_importer):
|
||||
|
||||
assert returned == json_file
|
||||
|
||||
def test_create_place_classtype_indexes(temp_db_conn, sp_importer):
|
||||
def test_create_place_classtype_indexes(temp_db_with_extensions, temp_db_conn,
|
||||
table_factory, sp_importer):
|
||||
"""
|
||||
Test that _create_place_classtype_indexes() create the
|
||||
place_id index and centroid index on the right place_class_type table.
|
||||
@ -100,9 +103,7 @@ def test_create_place_classtype_indexes(temp_db_conn, sp_importer):
|
||||
phrase_type = 'type'
|
||||
table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
|
||||
|
||||
with temp_db_conn.cursor() as temp_db_cursor:
|
||||
temp_db_cursor.execute("CREATE EXTENSION postgis;")
|
||||
temp_db_cursor.execute('CREATE TABLE {}(place_id BIGINT, centroid GEOMETRY)'.format(table_name))
|
||||
table_factory(table_name, 'place_id BIGINT, centroid GEOMETRY')
|
||||
|
||||
sp_importer._create_place_classtype_indexes('', phrase_class, phrase_type)
|
||||
|
||||
@ -119,7 +120,7 @@ def test_create_place_classtype_table(temp_db_conn, placex_table, sp_importer):
|
||||
|
||||
assert check_table_exist(temp_db_conn, phrase_class, phrase_type)
|
||||
|
||||
def test_grant_access_to_web_user(temp_db_conn, def_config, sp_importer):
|
||||
def test_grant_access_to_web_user(temp_db_conn, table_factory, def_config, sp_importer):
|
||||
"""
|
||||
Test that _grant_access_to_webuser() give
|
||||
right access to the web user.
|
||||
@ -128,8 +129,7 @@ def test_grant_access_to_web_user(temp_db_conn, def_config, sp_importer):
|
||||
phrase_type = 'type'
|
||||
table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
|
||||
|
||||
with temp_db_conn.cursor() as temp_db_cursor:
|
||||
temp_db_cursor.execute('CREATE TABLE {}()'.format(table_name))
|
||||
table_factory(table_name)
|
||||
|
||||
sp_importer._grant_access_to_webuser(phrase_class, phrase_type)
|
||||
|
||||
@ -165,7 +165,6 @@ def test_remove_non_existent_tables_from_db(sp_importer, default_phrases,
|
||||
place_classtype tables contained in table_phrases_to_delete should
|
||||
be deleted.
|
||||
"""
|
||||
with temp_db_conn.cursor() as temp_db_cursor:
|
||||
sp_importer.table_phrases_to_delete = {
|
||||
'place_classtype_testclasstypetable_to_delete'
|
||||
}
|
||||
@ -179,15 +178,15 @@ def test_remove_non_existent_tables_from_db(sp_importer, default_phrases,
|
||||
|
||||
sp_importer._remove_non_existent_tables_from_db()
|
||||
|
||||
temp_db_cursor.execute(query_tables)
|
||||
tables_result = temp_db_cursor.fetchall()
|
||||
assert (len(tables_result) == 1 and
|
||||
tables_result[0][0] == 'place_classtype_testclasstypetable_to_keep'
|
||||
)
|
||||
# Changes are not committed yet. Use temp_db_conn for checking results.
|
||||
with temp_db_conn.cursor(cursor_factory=CursorForTesting) as cur:
|
||||
assert cur.row_set(query_tables) \
|
||||
== {('place_classtype_testclasstypetable_to_keep', )}
|
||||
|
||||
|
||||
@pytest.mark.parametrize("should_replace", [(True), (False)])
|
||||
def test_import_phrases(monkeypatch, temp_db_conn, def_config, sp_importer,
|
||||
placex_table, tokenizer_mock, should_replace):
|
||||
placex_table, table_factory, tokenizer_mock, should_replace):
|
||||
"""
|
||||
Check that the main import_phrases() method is well executed.
|
||||
It should create the place_classtype table, the place_id and centroid indexes,
|
||||
@ -197,10 +196,8 @@ def test_import_phrases(monkeypatch, temp_db_conn, def_config, sp_importer,
|
||||
"""
|
||||
#Add some data to the database before execution in order to test
|
||||
#what is deleted and what is preserved.
|
||||
with temp_db_conn.cursor() as temp_db_cursor:
|
||||
temp_db_cursor.execute("""
|
||||
CREATE TABLE place_classtype_amenity_animal_shelter();
|
||||
CREATE TABLE place_classtype_wrongclass_wrongtype();""")
|
||||
table_factory('place_classtype_amenity_animal_shelter')
|
||||
table_factory('place_classtype_wrongclass_wrongtype')
|
||||
|
||||
monkeypatch.setattr('nominatim.tools.special_phrases.sp_wiki_loader.SPWikiLoader._get_wiki_content',
|
||||
mock_get_wiki_content)
|
||||
@ -220,35 +217,10 @@ def test_import_phrases(monkeypatch, temp_db_conn, def_config, sp_importer,
|
||||
if should_replace:
|
||||
assert not check_table_exist(temp_db_conn, 'wrong_class', 'wrong_type')
|
||||
|
||||
#Format (query, should_return_something_bool) use to easily execute all asserts
|
||||
queries_tests = set()
|
||||
|
||||
#Used to check that correct place_classtype table already in the datase before is still there.
|
||||
query_existing_table = """
|
||||
SELECT table_name
|
||||
FROM information_schema.tables
|
||||
WHERE table_schema='public'
|
||||
AND table_name = 'place_classtype_amenity_animal_shelter';
|
||||
"""
|
||||
queries_tests.add((query_existing_table, True))
|
||||
|
||||
#Used to check that wrong place_classtype table was deleted from the database.
|
||||
query_wrong_table = """
|
||||
SELECT table_name
|
||||
FROM information_schema.tables
|
||||
WHERE table_schema='public'
|
||||
AND table_name = 'place_classtype_wrongclass_wrongtype';
|
||||
"""
|
||||
assert temp_db_conn.table_exists('place_classtype_amenity_animal_shelter')
|
||||
if should_replace:
|
||||
queries_tests.add((query_wrong_table, False))
|
||||
assert not temp_db_conn.table_exists('place_classtype_wrongclass_wrongtype')
|
||||
|
||||
with temp_db_conn.cursor() as temp_db_cursor:
|
||||
for query in queries_tests:
|
||||
temp_db_cursor.execute(query[0])
|
||||
if (query[1] == True):
|
||||
assert temp_db_cursor.fetchone()
|
||||
else:
|
||||
assert not temp_db_cursor.fetchone()
|
||||
|
||||
def mock_get_wiki_content(self, lang):
|
||||
"""
|
||||
@ -270,15 +242,8 @@ def check_table_exist(temp_db_conn, phrase_class, phrase_type):
|
||||
Verify that the place_classtype table exists for the given
|
||||
phrase_class and phrase_type.
|
||||
"""
|
||||
table_name = 'place_classtype_{}_{}'.format(phrase_class, phrase_type)
|
||||
return temp_db_conn.table_exists('place_classtype_{}_{}'.format(phrase_class, phrase_type))
|
||||
|
||||
with temp_db_conn.cursor() as temp_db_cursor:
|
||||
temp_db_cursor.execute("""
|
||||
SELECT *
|
||||
FROM information_schema.tables
|
||||
WHERE table_type='BASE TABLE'
|
||||
AND table_name='{}'""".format(table_name))
|
||||
return temp_db_cursor.fetchone()
|
||||
|
||||
def check_grant_access(temp_db_conn, user, phrase_class, phrase_type):
|
||||
"""
|
||||
@ -332,7 +297,6 @@ def temp_phplib_dir_with_migration():
|
||||
yield Path(phpdir)
|
||||
|
||||
@pytest.fixture
|
||||
def default_phrases(temp_db_cursor):
|
||||
temp_db_cursor.execute("""
|
||||
CREATE TABLE place_classtype_testclasstypetable_to_delete();
|
||||
CREATE TABLE place_classtype_testclasstypetable_to_keep();""")
|
||||
def default_phrases(table_factory):
|
||||
table_factory('place_classtype_testclasstypetable_to_delete')
|
||||
table_factory('place_classtype_testclasstypetable_to_keep')
|
||||
|
@ -20,7 +20,7 @@ OSM_NODE_DATA = """\
|
||||
|
||||
### init replication
|
||||
|
||||
def test_init_replication_bad_base_url(monkeypatch, status_table, place_row, temp_db_conn, temp_db_cursor):
|
||||
def test_init_replication_bad_base_url(monkeypatch, status_table, place_row, temp_db_conn):
|
||||
place_row(osm_type='N', osm_id=100)
|
||||
|
||||
monkeypatch.setattr(nominatim.db.status, "get_url", lambda u : OSM_NODE_DATA)
|
||||
@ -39,12 +39,11 @@ def test_init_replication_success(monkeypatch, status_table, place_row, temp_db_
|
||||
|
||||
nominatim.tools.replication.init_replication(temp_db_conn, 'https://test.io')
|
||||
|
||||
temp_db_cursor.execute("SELECT * FROM import_status")
|
||||
|
||||
expected_date = dt.datetime.strptime('2006-01-27T19:09:10', status.ISODATE_FORMAT)\
|
||||
.replace(tzinfo=dt.timezone.utc)
|
||||
assert temp_db_cursor.rowcount == 1
|
||||
assert temp_db_cursor.fetchone() == [expected_date, 234, True]
|
||||
|
||||
assert temp_db_cursor.row_set("SELECT * FROM import_status") \
|
||||
== {(expected_date, 234, True)}
|
||||
|
||||
|
||||
### checking for updates
|
||||
|
Loading…
Reference in New Issue
Block a user