From dc183150b28fd7ae377f4de21c80e17bc4ea02fc Mon Sep 17 00:00:00 2001 From: secwall Date: Sun, 3 Dec 2017 01:39:30 +0300 Subject: [PATCH] Add blocking pids termination --- doc/tutorial.md | 23 +++++ features/conflicting_pids.feature | 40 +++++++++ features/steps/query.py | 6 ++ features/steps/run_pgmigrate.py | 7 +- pgmigrate.py | 134 ++++++++++++++++++++++++------ tox.ini | 3 + 6 files changed, 187 insertions(+), 26 deletions(-) create mode 100644 features/conflicting_pids.feature diff --git a/doc/tutorial.md b/doc/tutorial.md index 19ad510..c835f41 100644 --- a/doc/tutorial.md +++ b/doc/tutorial.md @@ -370,3 +370,26 @@ lock timeout to 30 seconds one could do something like this: pgmigrate -s "SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE" \ -s "SET lock_timeout = '30s'" ... ``` + +## Terminating blocking pids + +On heavy loaded production environments running some migrations +could block queries by application backends. +Unfortunately if migration is blocked by some other query it could lead +to really slow database queries. +For example lock queue like this: +``` + + + +``` +makes database almost unavailable for at least `idle_in_transaction_timeout`. +To mitigate such issues there is `-l ` option in pgmigrate +which starts separate thread running `pg_terminate_backend(pid)` for +each pid blocking any of pgmigrate conn pids every `interval` seconds. +Of course pgmigrate should be able to terminate other pids so migration user +should be the app user or have `pg_signal_backend` grant. To terminate +superuser (e.g. `postgres`) pids one could run pgmigrate with superuser. + +Note: this feature relays on `pg_blocking_pids()` function available since +PostgreSQL 9.6. diff --git a/features/conflicting_pids.feature b/features/conflicting_pids.feature new file mode 100644 index 0000000..18b1c80 --- /dev/null +++ b/features/conflicting_pids.feature @@ -0,0 +1,40 @@ +Feature: Conflicting pids termination + + Scenario: Transactional migration blocked by update passes + Given migration dir + And migrations + | file | code | + | V1__Create_test_table.sql | CREATE TABLE test (id bigint); | + | V2__Insert_test_data.sql | INSERT INTO test (id) VALUES (1); | + | V3__Alter_test_table.sql | ALTER TABLE test ADD COLUMN test text; | + And database and connection + And successful pgmigrate run with "-t 2 migrate" + And not commited query "UPDATE test SET id = 2 WHERE id = 1" + When we run pgmigrate with "-l 0.1 -t 3 migrate" + Then pgmigrate command "succeeded" + + Scenario: Nontransactional migration blocked by update passes + Given migration dir + And migrations + | file | code | + | V1__Create_test_table.sql | CREATE TABLE test (id bigint); | + | V2__Insert_test_data.sql | INSERT INTO test (id) VALUES (1); | + | V3__NONTRANSACTIONAL_migration.sql | ALTER TABLE test ADD COLUMN test text; | + And database and connection + And successful pgmigrate run with "-t 2 migrate" + And not commited query "UPDATE test SET id = 2 WHERE id = 1" + When we run pgmigrate with "-l 0.1 -t 3 migrate" + Then pgmigrate command "succeeded" + + Scenario: Mixed transactional and nontransactional migrations blocked by update pass + Given migration dir + And migrations + | file | code | + | V1__Transactional_migration.sql | ALTER TABLE test ADD COLUMN test text; | + | V2__NONTRANSACTIONAL_migration.sql | ALTER TABLE test ADD COLUMN test2 text; | + And database and connection + And query "CREATE TABLE test (id bigint)" + And query "INSERT INTO test (id) VALUES (1)" + And not commited query "UPDATE test SET id = 2 WHERE id = 1" + When we run pgmigrate with "-l 0.1 -t 2 migrate" + Then pgmigrate command "succeeded" diff --git a/features/steps/query.py b/features/steps/query.py index 71e1153..8556149 100644 --- a/features/steps/query.py +++ b/features/steps/query.py @@ -1,6 +1,12 @@ from behave import given, then +@given('not commited query "{query}"') # noqa +def step_impl(context, query): + cur = context.conn.cursor() + cur.execute(query) + + @given('query "{query}"') # noqa def step_impl(context, query): cur = context.conn.cursor() diff --git a/features/steps/run_pgmigrate.py b/features/steps/run_pgmigrate.py index 55f0b82..fae0897 100644 --- a/features/steps/run_pgmigrate.py +++ b/features/steps/run_pgmigrate.py @@ -3,6 +3,7 @@ import subprocess import sys import yaml +from func_timeout import FunctionTimedOut, func_timeout from behave import given, then, when @@ -15,7 +16,11 @@ def run_pgmigrate(migr_dir, args): p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = p.communicate() + try: + stdout, stderr = func_timeout(5, p.communicate) + except FunctionTimedOut: + p.terminate() + stdout, stderr = p.communicate() return p.returncode, str(stdout), str(stderr) @given('successful pgmigrate run with "{args}"') diff --git a/pgmigrate.py b/pgmigrate.py index 86db3ff..51b648b 100755 --- a/pgmigrate.py +++ b/pgmigrate.py @@ -32,8 +32,11 @@ import logging import os import re import sys +import threading +import time from builtins import str as text from collections import OrderedDict, namedtuple +from contextlib import closing import psycopg2 import sqlparse @@ -85,13 +88,69 @@ class BaselineError(MigrateError): pass +class ConflictTerminator(threading.Thread): + """ + Kills conflicting pids (only on postgresql > 9.6) + """ + def __init__(self, conn_str, interval): + threading.Thread.__init__(self, name='terminator') + self.daemon = True + self.log = logging.getLogger('terminator') + self.conn_str = conn_str + self.pids = set() + self.interval = interval + self.should_run = True + self.conn = None + + def stop(self): + """ + Stop iterations and close connection + """ + self.should_run = False + + def add_conn(self, conn): + """ + Add conn pid to pgmirate pids list + """ + self.pids.add(conn.get_backend_pid()) + + def remove_conn(self, conn): + """ + Remove conn from pgmigrate pids list + """ + self.pids.remove(conn.get_backend_pid()) + + def run(self): + """ + Periodically terminate all backends blocking pgmigrate pids + """ + self.conn = _create_raw_connection(self.conn_str, self.log) + self.conn.autocommit = True + while self.should_run: + with self.conn.cursor() as cursor: + for pid in self.pids: + cursor.execute( + 'SELECT pg_terminate_backend(pid) FROM ' + 'unnest(pg_blocking_pids(%s)) AS pid', + (pid,)) + time.sleep(self.interval) + + REF_COLUMNS = ['version', 'description', 'type', 'installed_by', 'installed_on'] -def _create_connection(conn_string): +def _create_raw_connection(conn_string, logger=LOG): conn = psycopg2.connect(conn_string, connection_factory=LoggingConnection) - conn.initialize(LOG) + conn.initialize(logger) + + return conn + + +def _create_connection(config): + conn = _create_raw_connection(config.conn) + if config.terminator_instance: + config.terminator_instance.add_conn(conn) return conn @@ -146,9 +205,10 @@ Callbacks = namedtuple('Callbacks', ('beforeAll', 'beforeEach', Config = namedtuple('Config', ('target', 'baseline', 'cursor', 'dryrun', 'callbacks', 'user', 'base_dir', 'conn', - 'session', 'conn_instance')) + 'session', 'conn_instance', + 'terminator_instance', 'termination_interval')) -CONFIG_IGNORE = ['cursor', 'conn_instance'] +CONFIG_IGNORE = ['cursor', 'conn_instance', 'terminator_instance'] def _get_migrations_info_from_dir(base_dir): @@ -473,6 +533,9 @@ def _finish(config): config.cursor.execute('rollback') else: config.cursor.execute('commit') + if config.terminator_instance: + config.terminator_instance.stop() + config.conn_instance.close() def info(config, stdout=True): @@ -558,6 +621,21 @@ def _prepare_nontransactional_steps(state, callbacks): return steps +def _execute_mixed_steps(config, steps, nt_conn): + commit_req = False + for step in steps: + if commit_req: + config.cursor.execute('commit') + commit_req = False + if not list(step['state'].values())[0]['transactional']: + cur = _init_cursor(nt_conn, config.session) + else: + cur = config.cursor + commit_req = True + _migrate_step(step['state'], step['cbs'], + config.base_dir, config.user, cur) + + def migrate(config): """ Migrate cmdline wrapper @@ -585,29 +663,23 @@ def migrate(config): raise MigrateError('Unable to mix transactional and ' 'nontransactional migrations') config.cursor.execute('rollback') - nt_conn = _create_connection(config.conn) - nt_conn.autocommit = True - cursor = _init_cursor(nt_conn, config.session) - _migrate_step(state, _get_callbacks(''), - config.base_dir, config.user, cursor) + with closing(_create_connection(config)) as nt_conn: + nt_conn.autocommit = True + cursor = _init_cursor(nt_conn, config.session) + _migrate_step(state, _get_callbacks(''), + config.base_dir, config.user, cursor) + if config.terminator_instance: + config.terminator_instance.remove_conn(nt_conn) else: steps = _prepare_nontransactional_steps(state, config.callbacks) - nt_conn = _create_connection(config.conn) - nt_conn.autocommit = True + with closing(_create_connection(config)) as nt_conn: + nt_conn.autocommit = True - commit_req = False - for step in steps: - if commit_req: - config.cursor.execute('commit') - commit_req = False - if not list(step['state'].values())[0]['transactional']: - cur = _init_cursor(nt_conn, config.session) - else: - cur = config.cursor - commit_req = True - _migrate_step(step['state'], step['cbs'], - config.base_dir, config.user, cur) + _execute_mixed_steps(config, steps, nt_conn) + + if config.terminator_instance: + config.terminator_instance.remove_conn(nt_conn) else: _migrate_step(state, config.callbacks, config.base_dir, config.user, config.cursor) @@ -627,7 +699,9 @@ CONFIG_DEFAULTS = Config(target=None, baseline=0, cursor=None, dryrun=False, session=['SET lock_timeout = 0'], conn='dbname=postgres user=postgres ' 'connect_timeout=1', - conn_instance=None) + conn_instance=None, + terminator_instance=None, + termination_interval=None) def get_config(base_dir, args=None): @@ -656,7 +730,13 @@ def get_config(base_dir, args=None): else: conf = conf._replace(target=int(conf.target)) - conf = conf._replace(conn_instance=_create_connection(conf.conn)) + if conf.termination_interval and not conf.dryrun: + conf = conf._replace( + terminator_instance=ConflictTerminator( + conf.conn, conf.termination_interval)) + conf.terminator_instance.start() + + conf = conf._replace(conn_instance=_create_connection(conf)) conf = conf._replace(cursor=_init_cursor(conf.conn_instance, conf.session)) conf = conf._replace(callbacks=_get_callbacks(conf.callbacks, conf.base_dir)) @@ -705,6 +785,10 @@ def _main(): parser.add_argument('-n', '--dryrun', action='store_true', help='Say "rollback" in the end instead of "commit"') + parser.add_argument('-l', '--termination_interval', + type=float, + help='Inverval for terminating blocking pids ' + '(only for transational migrations)') parser.add_argument('-v', '--verbose', default=0, action='count', diff --git a/tox.ini b/tox.ini index 48b0c69..a869257 100644 --- a/tox.ini +++ b/tox.ini @@ -17,6 +17,7 @@ commands = rm -rf htmlcov deps = behave importlib coverage + func_timeout [testenv:py35] whitelist_externals = rm @@ -28,6 +29,7 @@ commands = rm -rf htmlcov coverage report --fail-under=100 pgmigrate.py deps = behave coverage + func_timeout [testenv:py36] whitelist_externals = rm @@ -39,6 +41,7 @@ commands = rm -rf htmlcov coverage report --fail-under=100 pgmigrate.py deps = behave coverage + func_timeout [testenv:flake8] commands = flake8 pgmigrate.py