2024-06-09 15:52:20 +03:00
|
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
2022-01-03 18:23:58 +03:00
|
|
|
#
|
|
|
|
# This file is part of Nominatim. (https://nominatim.org)
|
|
|
|
#
|
2024-06-09 15:52:20 +03:00
|
|
|
# Copyright (C) 2024 by the Nominatim developer community.
|
2022-01-03 18:23:58 +03:00
|
|
|
# For a full list of authors see the git log.
|
2021-01-23 01:25:37 +03:00
|
|
|
"""
|
|
|
|
Tests for DB utility functions in db.utils
|
|
|
|
"""
|
2021-07-21 12:37:14 +03:00
|
|
|
import json
|
|
|
|
|
2021-01-23 01:25:37 +03:00
|
|
|
import pytest
|
|
|
|
|
2024-06-09 15:52:20 +03:00
|
|
|
import nominatim_core.db.utils as db_utils
|
|
|
|
from nominatim_core.errors import UsageError
|
2021-01-23 01:25:37 +03:00
|
|
|
|
2021-02-23 21:05:51 +03:00
|
|
|
def test_execute_file_success(dsn, temp_db_cursor, tmp_path):
|
2021-01-23 01:25:37 +03:00
|
|
|
tmpfile = tmp_path / 'test.sql'
|
|
|
|
tmpfile.write_text('CREATE TABLE test (id INT);\nINSERT INTO test VALUES(56);')
|
|
|
|
|
2021-02-23 21:05:51 +03:00
|
|
|
db_utils.execute_file(dsn, tmpfile)
|
2021-01-23 01:25:37 +03:00
|
|
|
|
2021-05-19 13:11:04 +03:00
|
|
|
assert temp_db_cursor.row_set('SELECT * FROM test') == {(56, )}
|
2021-01-23 01:25:37 +03:00
|
|
|
|
2021-02-23 21:05:51 +03:00
|
|
|
def test_execute_file_bad_file(dsn, tmp_path):
|
2021-01-23 19:25:14 +03:00
|
|
|
with pytest.raises(FileNotFoundError):
|
2021-02-23 21:05:51 +03:00
|
|
|
db_utils.execute_file(dsn, tmp_path / 'test2.sql')
|
|
|
|
|
2021-02-24 00:50:23 +03:00
|
|
|
|
2021-02-23 21:05:51 +03:00
|
|
|
def test_execute_file_bad_sql(dsn, tmp_path):
|
|
|
|
tmpfile = tmp_path / 'test.sql'
|
|
|
|
tmpfile.write_text('CREATE STABLE test (id INT)')
|
|
|
|
|
|
|
|
with pytest.raises(UsageError):
|
|
|
|
db_utils.execute_file(dsn, tmpfile)
|
|
|
|
|
2021-01-23 01:25:37 +03:00
|
|
|
|
2021-02-23 21:05:51 +03:00
|
|
|
def test_execute_file_bad_sql_ignore_errors(dsn, tmp_path):
|
2021-01-23 01:25:37 +03:00
|
|
|
tmpfile = tmp_path / 'test.sql'
|
|
|
|
tmpfile.write_text('CREATE STABLE test (id INT)')
|
|
|
|
|
2021-02-23 21:05:51 +03:00
|
|
|
db_utils.execute_file(dsn, tmpfile, ignore_errors=True)
|
2021-02-25 00:02:13 +03:00
|
|
|
|
|
|
|
|
|
|
|
def test_execute_file_with_pre_code(dsn, tmp_path, temp_db_cursor):
|
|
|
|
tmpfile = tmp_path / 'test.sql'
|
|
|
|
tmpfile.write_text('INSERT INTO test VALUES(4)')
|
|
|
|
|
|
|
|
db_utils.execute_file(dsn, tmpfile, pre_code='CREATE TABLE test (id INT)')
|
|
|
|
|
2021-05-19 13:11:04 +03:00
|
|
|
assert temp_db_cursor.row_set('SELECT * FROM test') == {(4, )}
|
2021-02-25 00:02:13 +03:00
|
|
|
|
|
|
|
|
|
|
|
def test_execute_file_with_post_code(dsn, tmp_path, temp_db_cursor):
|
|
|
|
tmpfile = tmp_path / 'test.sql'
|
|
|
|
tmpfile.write_text('CREATE TABLE test (id INT)')
|
|
|
|
|
|
|
|
db_utils.execute_file(dsn, tmpfile, post_code='INSERT INTO test VALUES(23)')
|
|
|
|
|
2021-05-19 13:11:04 +03:00
|
|
|
assert temp_db_cursor.row_set('SELECT * FROM test') == {(23, )}
|
2021-06-10 10:36:43 +03:00
|
|
|
|
|
|
|
|
|
|
|
class TestCopyBuffer:
|
|
|
|
TABLE_NAME = 'copytable'
|
|
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
|
|
def setup_test_table(self, table_factory):
|
2022-07-05 10:12:55 +03:00
|
|
|
table_factory(self.TABLE_NAME, 'col_a INT, col_b TEXT')
|
2021-06-10 10:36:43 +03:00
|
|
|
|
|
|
|
|
|
|
|
def table_rows(self, cursor):
|
|
|
|
return cursor.row_set('SELECT * FROM ' + self.TABLE_NAME)
|
|
|
|
|
|
|
|
|
|
|
|
def test_copybuffer_empty(self):
|
|
|
|
with db_utils.CopyBuffer() as buf:
|
|
|
|
buf.copy_out(None, "dummy")
|
|
|
|
|
|
|
|
|
|
|
|
def test_all_columns(self, temp_db_cursor):
|
|
|
|
with db_utils.CopyBuffer() as buf:
|
|
|
|
buf.add(3, 'hum')
|
|
|
|
buf.add(None, 'f\\t')
|
|
|
|
|
|
|
|
buf.copy_out(temp_db_cursor, self.TABLE_NAME)
|
|
|
|
|
|
|
|
assert self.table_rows(temp_db_cursor) == {(3, 'hum'), (None, 'f\\t')}
|
|
|
|
|
|
|
|
|
|
|
|
def test_selected_columns(self, temp_db_cursor):
|
|
|
|
with db_utils.CopyBuffer() as buf:
|
|
|
|
buf.add('foo')
|
|
|
|
|
|
|
|
buf.copy_out(temp_db_cursor, self.TABLE_NAME,
|
2022-07-05 10:12:55 +03:00
|
|
|
columns=['col_b'])
|
2021-06-10 10:36:43 +03:00
|
|
|
|
|
|
|
assert self.table_rows(temp_db_cursor) == {(None, 'foo')}
|
|
|
|
|
|
|
|
|
|
|
|
def test_reordered_columns(self, temp_db_cursor):
|
|
|
|
with db_utils.CopyBuffer() as buf:
|
|
|
|
buf.add('one', 1)
|
|
|
|
buf.add(' two ', 2)
|
|
|
|
|
|
|
|
buf.copy_out(temp_db_cursor, self.TABLE_NAME,
|
2022-07-05 10:12:55 +03:00
|
|
|
columns=['col_b', 'col_a'])
|
2021-06-10 10:36:43 +03:00
|
|
|
|
|
|
|
assert self.table_rows(temp_db_cursor) == {(1, 'one'), (2, ' two ')}
|
|
|
|
|
|
|
|
|
|
|
|
def test_special_characters(self, temp_db_cursor):
|
|
|
|
with db_utils.CopyBuffer() as buf:
|
|
|
|
buf.add('foo\tbar')
|
|
|
|
buf.add('sun\nson')
|
|
|
|
buf.add('\\N')
|
|
|
|
|
|
|
|
buf.copy_out(temp_db_cursor, self.TABLE_NAME,
|
2022-07-05 10:12:55 +03:00
|
|
|
columns=['col_b'])
|
2021-06-10 10:36:43 +03:00
|
|
|
|
|
|
|
assert self.table_rows(temp_db_cursor) == {(None, 'foo\tbar'),
|
|
|
|
(None, 'sun\nson'),
|
|
|
|
(None, '\\N')}
|
|
|
|
|
|
|
|
|
|
|
|
|
2021-07-21 12:37:14 +03:00
|
|
|
class TestCopyBufferJson:
|
|
|
|
TABLE_NAME = 'copytable'
|
|
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
|
|
def setup_test_table(self, table_factory):
|
2022-07-05 10:12:55 +03:00
|
|
|
table_factory(self.TABLE_NAME, 'col_a INT, col_b JSONB')
|
2021-07-21 12:37:14 +03:00
|
|
|
|
|
|
|
|
|
|
|
def table_rows(self, cursor):
|
|
|
|
cursor.execute('SELECT * FROM ' + self.TABLE_NAME)
|
|
|
|
results = {k: v for k,v in cursor}
|
|
|
|
|
|
|
|
assert len(results) == cursor.rowcount
|
|
|
|
|
|
|
|
return results
|
|
|
|
|
|
|
|
|
|
|
|
def test_json_object(self, temp_db_cursor):
|
|
|
|
with db_utils.CopyBuffer() as buf:
|
|
|
|
buf.add(1, json.dumps({'test': 'value', 'number': 1}))
|
|
|
|
|
|
|
|
buf.copy_out(temp_db_cursor, self.TABLE_NAME)
|
|
|
|
|
|
|
|
assert self.table_rows(temp_db_cursor) == \
|
|
|
|
{1: {'test': 'value', 'number': 1}}
|
|
|
|
|
|
|
|
|
|
|
|
def test_json_object_special_chras(self, temp_db_cursor):
|
|
|
|
with db_utils.CopyBuffer() as buf:
|
|
|
|
buf.add(1, json.dumps({'te\tst': 'va\nlue', 'nu"mber': None}))
|
|
|
|
|
|
|
|
buf.copy_out(temp_db_cursor, self.TABLE_NAME)
|
|
|
|
|
|
|
|
assert self.table_rows(temp_db_cursor) == \
|
|
|
|
{1: {'te\tst': 'va\nlue', 'nu"mber': None}}
|