1
1
mirror of https://github.com/yandex/pgmigrate.git synced 2024-11-09 16:36:05 +03:00

Add blocking pids termination

This commit is contained in:
secwall 2017-12-03 01:39:30 +03:00
parent 2ca64cb340
commit dc183150b2
6 changed files with 187 additions and 26 deletions

View File

@ -370,3 +370,26 @@ lock timeout to 30 seconds one could do something like this:
pgmigrate -s "SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE" \
-s "SET lock_timeout = '30s'" ...
```
## Terminating blocking pids
On heavy loaded production environments running some migrations
could block queries by application backends.
Unfortunately if migration is blocked by some other query it could lead
to really slow database queries.
For example lock queue like this:
```
<lots of app backends>
<pgmigrate>
<stale backend in idle in transaction>
```
makes database almost unavailable for at least `idle_in_transaction_timeout`.
To mitigate such issues there is `-l <interval>` option in pgmigrate
which starts separate thread running `pg_terminate_backend(pid)` for
each pid blocking any of pgmigrate conn pids every `interval` seconds.
Of course pgmigrate should be able to terminate other pids so migration user
should be the app user or have `pg_signal_backend` grant. To terminate
superuser (e.g. `postgres`) pids one could run pgmigrate with superuser.
Note: this feature relays on `pg_blocking_pids()` function available since
PostgreSQL 9.6.

View File

@ -0,0 +1,40 @@
Feature: Conflicting pids termination
Scenario: Transactional migration blocked by update passes
Given migration dir
And migrations
| file | code |
| V1__Create_test_table.sql | CREATE TABLE test (id bigint); |
| V2__Insert_test_data.sql | INSERT INTO test (id) VALUES (1); |
| V3__Alter_test_table.sql | ALTER TABLE test ADD COLUMN test text; |
And database and connection
And successful pgmigrate run with "-t 2 migrate"
And not commited query "UPDATE test SET id = 2 WHERE id = 1"
When we run pgmigrate with "-l 0.1 -t 3 migrate"
Then pgmigrate command "succeeded"
Scenario: Nontransactional migration blocked by update passes
Given migration dir
And migrations
| file | code |
| V1__Create_test_table.sql | CREATE TABLE test (id bigint); |
| V2__Insert_test_data.sql | INSERT INTO test (id) VALUES (1); |
| V3__NONTRANSACTIONAL_migration.sql | ALTER TABLE test ADD COLUMN test text; |
And database and connection
And successful pgmigrate run with "-t 2 migrate"
And not commited query "UPDATE test SET id = 2 WHERE id = 1"
When we run pgmigrate with "-l 0.1 -t 3 migrate"
Then pgmigrate command "succeeded"
Scenario: Mixed transactional and nontransactional migrations blocked by update pass
Given migration dir
And migrations
| file | code |
| V1__Transactional_migration.sql | ALTER TABLE test ADD COLUMN test text; |
| V2__NONTRANSACTIONAL_migration.sql | ALTER TABLE test ADD COLUMN test2 text; |
And database and connection
And query "CREATE TABLE test (id bigint)"
And query "INSERT INTO test (id) VALUES (1)"
And not commited query "UPDATE test SET id = 2 WHERE id = 1"
When we run pgmigrate with "-l 0.1 -t 2 migrate"
Then pgmigrate command "succeeded"

View File

@ -1,6 +1,12 @@
from behave import given, then
@given('not commited query "{query}"') # noqa
def step_impl(context, query):
cur = context.conn.cursor()
cur.execute(query)
@given('query "{query}"') # noqa
def step_impl(context, query):
cur = context.conn.cursor()

View File

@ -3,6 +3,7 @@ import subprocess
import sys
import yaml
from func_timeout import FunctionTimedOut, func_timeout
from behave import given, then, when
@ -15,7 +16,11 @@ def run_pgmigrate(migr_dir, args):
p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
try:
stdout, stderr = func_timeout(5, p.communicate)
except FunctionTimedOut:
p.terminate()
stdout, stderr = p.communicate()
return p.returncode, str(stdout), str(stderr)
@given('successful pgmigrate run with "{args}"')

View File

@ -32,8 +32,11 @@ import logging
import os
import re
import sys
import threading
import time
from builtins import str as text
from collections import OrderedDict, namedtuple
from contextlib import closing
import psycopg2
import sqlparse
@ -85,13 +88,69 @@ class BaselineError(MigrateError):
pass
class ConflictTerminator(threading.Thread):
"""
Kills conflicting pids (only on postgresql > 9.6)
"""
def __init__(self, conn_str, interval):
threading.Thread.__init__(self, name='terminator')
self.daemon = True
self.log = logging.getLogger('terminator')
self.conn_str = conn_str
self.pids = set()
self.interval = interval
self.should_run = True
self.conn = None
def stop(self):
"""
Stop iterations and close connection
"""
self.should_run = False
def add_conn(self, conn):
"""
Add conn pid to pgmirate pids list
"""
self.pids.add(conn.get_backend_pid())
def remove_conn(self, conn):
"""
Remove conn from pgmigrate pids list
"""
self.pids.remove(conn.get_backend_pid())
def run(self):
"""
Periodically terminate all backends blocking pgmigrate pids
"""
self.conn = _create_raw_connection(self.conn_str, self.log)
self.conn.autocommit = True
while self.should_run:
with self.conn.cursor() as cursor:
for pid in self.pids:
cursor.execute(
'SELECT pg_terminate_backend(pid) FROM '
'unnest(pg_blocking_pids(%s)) AS pid',
(pid,))
time.sleep(self.interval)
REF_COLUMNS = ['version', 'description', 'type',
'installed_by', 'installed_on']
def _create_connection(conn_string):
def _create_raw_connection(conn_string, logger=LOG):
conn = psycopg2.connect(conn_string, connection_factory=LoggingConnection)
conn.initialize(LOG)
conn.initialize(logger)
return conn
def _create_connection(config):
conn = _create_raw_connection(config.conn)
if config.terminator_instance:
config.terminator_instance.add_conn(conn)
return conn
@ -146,9 +205,10 @@ Callbacks = namedtuple('Callbacks', ('beforeAll', 'beforeEach',
Config = namedtuple('Config', ('target', 'baseline', 'cursor', 'dryrun',
'callbacks', 'user', 'base_dir', 'conn',
'session', 'conn_instance'))
'session', 'conn_instance',
'terminator_instance', 'termination_interval'))
CONFIG_IGNORE = ['cursor', 'conn_instance']
CONFIG_IGNORE = ['cursor', 'conn_instance', 'terminator_instance']
def _get_migrations_info_from_dir(base_dir):
@ -473,6 +533,9 @@ def _finish(config):
config.cursor.execute('rollback')
else:
config.cursor.execute('commit')
if config.terminator_instance:
config.terminator_instance.stop()
config.conn_instance.close()
def info(config, stdout=True):
@ -558,6 +621,21 @@ def _prepare_nontransactional_steps(state, callbacks):
return steps
def _execute_mixed_steps(config, steps, nt_conn):
commit_req = False
for step in steps:
if commit_req:
config.cursor.execute('commit')
commit_req = False
if not list(step['state'].values())[0]['transactional']:
cur = _init_cursor(nt_conn, config.session)
else:
cur = config.cursor
commit_req = True
_migrate_step(step['state'], step['cbs'],
config.base_dir, config.user, cur)
def migrate(config):
"""
Migrate cmdline wrapper
@ -585,29 +663,23 @@ def migrate(config):
raise MigrateError('Unable to mix transactional and '
'nontransactional migrations')
config.cursor.execute('rollback')
nt_conn = _create_connection(config.conn)
nt_conn.autocommit = True
cursor = _init_cursor(nt_conn, config.session)
_migrate_step(state, _get_callbacks(''),
config.base_dir, config.user, cursor)
with closing(_create_connection(config)) as nt_conn:
nt_conn.autocommit = True
cursor = _init_cursor(nt_conn, config.session)
_migrate_step(state, _get_callbacks(''),
config.base_dir, config.user, cursor)
if config.terminator_instance:
config.terminator_instance.remove_conn(nt_conn)
else:
steps = _prepare_nontransactional_steps(state, config.callbacks)
nt_conn = _create_connection(config.conn)
nt_conn.autocommit = True
with closing(_create_connection(config)) as nt_conn:
nt_conn.autocommit = True
commit_req = False
for step in steps:
if commit_req:
config.cursor.execute('commit')
commit_req = False
if not list(step['state'].values())[0]['transactional']:
cur = _init_cursor(nt_conn, config.session)
else:
cur = config.cursor
commit_req = True
_migrate_step(step['state'], step['cbs'],
config.base_dir, config.user, cur)
_execute_mixed_steps(config, steps, nt_conn)
if config.terminator_instance:
config.terminator_instance.remove_conn(nt_conn)
else:
_migrate_step(state, config.callbacks, config.base_dir,
config.user, config.cursor)
@ -627,7 +699,9 @@ CONFIG_DEFAULTS = Config(target=None, baseline=0, cursor=None, dryrun=False,
session=['SET lock_timeout = 0'],
conn='dbname=postgres user=postgres '
'connect_timeout=1',
conn_instance=None)
conn_instance=None,
terminator_instance=None,
termination_interval=None)
def get_config(base_dir, args=None):
@ -656,7 +730,13 @@ def get_config(base_dir, args=None):
else:
conf = conf._replace(target=int(conf.target))
conf = conf._replace(conn_instance=_create_connection(conf.conn))
if conf.termination_interval and not conf.dryrun:
conf = conf._replace(
terminator_instance=ConflictTerminator(
conf.conn, conf.termination_interval))
conf.terminator_instance.start()
conf = conf._replace(conn_instance=_create_connection(conf))
conf = conf._replace(cursor=_init_cursor(conf.conn_instance, conf.session))
conf = conf._replace(callbacks=_get_callbacks(conf.callbacks,
conf.base_dir))
@ -705,6 +785,10 @@ def _main():
parser.add_argument('-n', '--dryrun',
action='store_true',
help='Say "rollback" in the end instead of "commit"')
parser.add_argument('-l', '--termination_interval',
type=float,
help='Inverval for terminating blocking pids '
'(only for transational migrations)')
parser.add_argument('-v', '--verbose',
default=0,
action='count',

View File

@ -17,6 +17,7 @@ commands = rm -rf htmlcov
deps = behave
importlib
coverage
func_timeout
[testenv:py35]
whitelist_externals = rm
@ -28,6 +29,7 @@ commands = rm -rf htmlcov
coverage report --fail-under=100 pgmigrate.py
deps = behave
coverage
func_timeout
[testenv:py36]
whitelist_externals = rm
@ -39,6 +41,7 @@ commands = rm -rf htmlcov
coverage report --fail-under=100 pgmigrate.py
deps = behave
coverage
func_timeout
[testenv:flake8]
commands = flake8 pgmigrate.py