diff --git a/nominatim/db/async_connection.py b/nominatim/db/async_connection.py index fe8b3006..db4b89ce 100644 --- a/nominatim/db/async_connection.py +++ b/nominatim/db/async_connection.py @@ -28,8 +28,9 @@ class DeadlockHandler: normally. """ - def __init__(self, handler): + def __init__(self, handler, ignore_sql_errors=False): self.handler = handler + self.ignore_sql_errors = ignore_sql_errors def __enter__(self): pass @@ -44,6 +45,11 @@ class DeadlockHandler: if exc_value.pgcode == '40P01': self.handler() return True + + if self.ignore_sql_errors and isinstance(exc_value, psycopg2.Error): + LOG.info("SQL error ignored: %s", exc_value) + return True + return False @@ -51,10 +57,11 @@ class DBConnection: """ A single non-blocking database connection. """ - def __init__(self, dsn, cursor_factory=None): + def __init__(self, dsn, cursor_factory=None, ignore_sql_errors=False): self.current_query = None self.current_params = None self.dsn = dsn + self.ignore_sql_errors = ignore_sql_errors self.conn = None self.cursor = None @@ -101,7 +108,7 @@ class DBConnection: """ Block until any pending operation is done. """ while True: - with DeadlockHandler(self._deadlock_handler): + with DeadlockHandler(self._deadlock_handler, self.ignore_sql_errors): wait_select(self.conn) self.current_query = None return @@ -128,7 +135,7 @@ class DBConnection: if self.current_query is None: return True - with DeadlockHandler(self._deadlock_handler): + with DeadlockHandler(self._deadlock_handler, self.ignore_sql_errors): if self.conn.poll() == psycopg2.extensions.POLL_OK: self.current_query = None return True @@ -143,8 +150,9 @@ class WorkerPool: """ REOPEN_CONNECTIONS_AFTER = 100000 - def __init__(self, dsn, pool_size): - self.threads = [DBConnection(dsn) for _ in range(pool_size)] + def __init__(self, dsn, pool_size, ignore_sql_errors=False): + self.threads = [DBConnection(dsn, ignore_sql_errors=ignore_sql_errors) + for _ in range(pool_size)] self.free_workers = self._yield_free_worker() self.wait_time = 0 diff --git a/nominatim/tools/tiger_data.py b/nominatim/tools/tiger_data.py index 07772c70..fbcdb077 100644 --- a/nominatim/tools/tiger_data.py +++ b/nominatim/tools/tiger_data.py @@ -4,10 +4,9 @@ Functions for importing tiger data and handling tarbar and directory files import logging import os import tarfile -import selectors from nominatim.db.connection import connect -from nominatim.db.async_connection import DBConnection +from nominatim.db.async_connection import WorkerPool from nominatim.db.sql_preprocessor import SQLPreprocessor @@ -37,44 +36,20 @@ def handle_tarfile_or_directory(data_dir): return sql_files, tar -def handle_threaded_sql_statements(sel, file): +def handle_threaded_sql_statements(pool, file): """ Handles sql statement with multiplexing """ lines = 0 - end_of_file = False # Using pool of database connections to execute sql statements - while not end_of_file: - for key, _ in sel.select(1): - conn = key.data - try: - if conn.is_done(): - sql_query = file.readline() - lines += 1 - if not sql_query: - end_of_file = True - break - conn.perform(sql_query) - if lines == 1000: - print('. ', end='', flush=True) - lines = 0 - except Exception as exc: # pylint: disable=broad-except - LOG.info('Wrong SQL statement: %s', exc) + for sql_query in file: + pool.next_free_worker().perform(sql_query) -def handle_unregister_connection_pool(sel, place_threads): - """ Handles unregistering pool of connections - """ + lines += 1 + if lines == 1000: + print('.', end='', flush=True) + lines = 0 - while place_threads > 0: - for key, _ in sel.select(1): - conn = key.data - sel.unregister(conn) - try: - conn.wait() - except Exception as exc: # pylint: disable=broad-except - LOG.info('Wrong SQL statement: %s', exc) - conn.close() - place_threads -= 1 def add_tiger_data(data_dir, config, threads): """ Import tiger data from directory or tar file `data dir`. @@ -91,25 +66,16 @@ def add_tiger_data(data_dir, config, threads): # Reading sql_files and then for each file line handling # sql_query in chunks. - sel = selectors.DefaultSelector() place_threads = max(1, threads - 1) - # Creates a pool of database connections - for _ in range(place_threads): - conn = DBConnection(dsn) - conn.connect() - sel.register(conn, selectors.EVENT_WRITE, conn) + with WorkerPool(dsn, place_threads, ignore_sql_errors=True) as pool: + for sql_file in sql_files: + if not tar: + file = open(sql_file) + else: + file = tar.extractfile(sql_file) - for sql_file in sql_files: - if not tar: - file = open(sql_file) - else: - file = tar.extractfile(sql_file) - - handle_threaded_sql_statements(sel, file) - - # Unregistering pool of database connections - handle_unregister_connection_pool(sel, place_threads) + handle_threaded_sql_statements(pool, file) if tar: tar.close() diff --git a/test/python/test_db_async_connection.py b/test/python/test_db_async_connection.py index b52f7053..330b86f7 100644 --- a/test/python/test_db_async_connection.py +++ b/test/python/test_db_async_connection.py @@ -56,13 +56,21 @@ def test_bad_query(conn): conn.wait() +def test_bad_query_ignore(temp_db): + with closing(DBConnection('dbname=' + temp_db, ignore_sql_errors=True)) as conn: + conn.connect() + + conn.perform('SELECT efasfjsea') + + conn.wait() + + def exec_with_deadlock(cur, sql, detector): with DeadlockHandler(lambda *args: detector.append(1)): cur.execute(sql) def test_deadlock(simple_conns): - print(psycopg2.__version__) cur1, cur2 = simple_conns cur1.execute("""CREATE TABLE t1 (id INT PRIMARY KEY, t TEXT);