Merge pull request #2185 from lonvia/fix-deadlock-handling-for-psycopg27

Improve deadlock detection for various versions of psycopg2
This commit is contained in:
Sarah Hoffmann 2021-02-25 18:39:40 +01:00 committed by GitHub
commit 204fe20b4b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 152 additions and 23 deletions

View File

@ -39,7 +39,7 @@ For running Nominatim:
* [PostgreSQL](https://www.postgresql.org) (9.3+)
* [PostGIS](https://postgis.net) (2.2+)
* [Python 3](https://www.python.org/) (3.5+)
* [Psycopg2](https://www.psycopg.org)
* [Psycopg2](https://www.psycopg.org) (2.7+)
* [Python Dotenv](https://github.com/theskumar/python-dotenv)
* [PHP](https://php.net) (7.0 or later)
* PHP-pgsql

View File

@ -9,8 +9,41 @@ import logging
import psycopg2
from psycopg2.extras import wait_select
# psycopg2 emits different exceptions pre and post 2.8. Detect if the new error
# module is available and adapt the error handling accordingly.
try:
import psycopg2.errors # pylint: disable=no-name-in-module,import-error
__has_psycopg2_errors__ = True
except ModuleNotFoundError:
__has_psycopg2_errors__ = False
LOG = logging.getLogger()
class DeadlockHandler:
""" Context manager that catches deadlock exceptions and calls
the given handler function. All other exceptions are passed on
normally.
"""
def __init__(self, handler):
self.handler = handler
def __enter__(self):
pass
def __exit__(self, exc_type, exc_value, traceback):
if __has_psycopg2_errors__:
if exc_type == psycopg2.errors.DeadlockDetected: # pylint: disable=E1101
self.handler()
return True
else:
if exc_type == psycopg2.extensions.TransactionRollbackError:
if exc_value.pgcode == '40P01':
self.handler()
return True
return False
class DBConnection:
""" A single non-blocking database connection.
"""
@ -24,15 +57,22 @@ class DBConnection:
self.cursor = None
self.connect()
def close(self):
""" Close all open connections. Does not wait for pending requests.
"""
if self.conn is not None:
self.cursor.close()
self.conn.close()
self.conn = None
def connect(self):
""" (Re)connect to the database. Creates an asynchronous connection
with JIT and parallel processing disabled. If a connection was
already open, it is closed and a new connection established.
The caller must ensure that no query is pending before reconnecting.
"""
if self.conn is not None:
self.cursor.close()
self.conn.close()
self.close()
# Use a dict to hand in the parameters because async is a reserved
# word in Python3.
@ -50,23 +90,18 @@ class DBConnection:
WHERE name = 'max_parallel_workers_per_gather';""")
self.wait()
def _deadlock_handler(self):
LOG.info("Deadlock detected (params = %s), retry.", str(self.current_params))
self.cursor.execute(self.current_query, self.current_params)
def wait(self):
""" Block until any pending operation is done.
"""
while True:
try:
with DeadlockHandler(self._deadlock_handler):
wait_select(self.conn)
self.current_query = None
return
except psycopg2.extensions.TransactionRollbackError as error:
if error.pgcode == '40P01':
LOG.info("Deadlock detected (params = %s), retry.",
str(self.current_params))
self.cursor.execute(self.current_query, self.current_params)
else:
raise
except psycopg2.errors.DeadlockDetected: # pylint: disable=E1101
self.cursor.execute(self.current_query, self.current_params)
def perform(self, sql, args=None):
""" Send SQL query to the server. Returns immediately without
@ -90,17 +125,9 @@ class DBConnection:
if self.current_query is None:
return True
try:
with DeadlockHandler(self._deadlock_handler):
if self.conn.poll() == psycopg2.extensions.POLL_OK:
self.current_query = None
return True
except psycopg2.extensions.TransactionRollbackError as error:
if error.pgcode == '40P01':
LOG.info("Deadlock detected (params = %s), retry.", str(self.current_params))
self.cursor.execute(self.current_query, self.current_params)
else:
raise
except psycopg2.errors.DeadlockDetected: # pylint: disable=E1101
self.cursor.execute(self.current_query, self.current_params)
return False

View File

@ -0,0 +1,102 @@
"""
Tests for function providing a non-blocking query interface towards PostgreSQL.
"""
from contextlib import closing
import concurrent.futures
import pytest
import psycopg2
from psycopg2.extras import wait_select
from nominatim.db.async_connection import DBConnection, DeadlockHandler
@pytest.fixture
def conn(temp_db):
with closing(DBConnection('dbname=' + temp_db)) as c:
yield c
@pytest.fixture
def simple_conns(temp_db):
conn1 = psycopg2.connect('dbname=' + temp_db)
conn2 = psycopg2.connect('dbname=' + temp_db)
yield conn1.cursor(), conn2.cursor()
conn1.close()
conn2.close()
def test_simple_query(conn, temp_db_conn):
conn.connect()
conn.perform('CREATE TABLE foo (id INT)')
conn.wait()
temp_db_conn.table_exists('foo')
def test_wait_for_query(conn):
conn.connect()
conn.perform('SELECT pg_sleep(1)')
assert not conn.is_done()
conn.wait()
def test_bad_query(conn):
conn.connect()
conn.perform('SELECT efasfjsea')
with pytest.raises(psycopg2.ProgrammingError):
conn.wait()
def exec_with_deadlock(cur, sql, detector):
with DeadlockHandler(lambda *args: detector.append(1)):
cur.execute(sql)
def test_deadlock(simple_conns):
print(psycopg2.__version__)
cur1, cur2 = simple_conns
cur1.execute("""CREATE TABLE t1 (id INT PRIMARY KEY, t TEXT);
INSERT into t1 VALUES (1, 'a'), (2, 'b')""")
cur1.connection.commit()
cur1.execute("UPDATE t1 SET t = 'x' WHERE id = 1")
cur2.execute("UPDATE t1 SET t = 'x' WHERE id = 2")
# This is the tricky part of the test. The first SQL command runs into
# a lock and blocks, so we have to run it in a separate thread. When the
# second deadlocking SQL statement is issued, Postgresql will abort one of
# the two transactions that cause the deadlock. There is no way to tell
# which one of the two. Therefore wrap both in a DeadlockHandler and
# expect that exactly one of the two triggers.
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
deadlock_check = []
try:
future = executor.submit(exec_with_deadlock, cur2,
"UPDATE t1 SET t = 'y' WHERE id = 1",
deadlock_check)
while not future.running():
pass
exec_with_deadlock(cur1, "UPDATE t1 SET t = 'y' WHERE id = 2",
deadlock_check)
finally:
# Whatever happens, make sure the deadlock gets resolved.
cur1.connection.rollback()
future.result()
assert len(deadlock_check) == 1