1
1
mirror of https://github.com/yandex/pgmigrate.git synced 2024-09-17 15:17:18 +03:00

Upgrade to bionic, drop python 3.5 and add yapf

This commit is contained in:
secwall 2018-08-18 22:34:31 +03:00
parent 676082c3a7
commit 8f109e797d
3 changed files with 151 additions and 153 deletions

View File

@ -1,5 +1,5 @@
# vim:set ft=dockerfile:
FROM ubuntu:xenial
FROM ubuntu:bionic
# explicitly set user/group IDs
RUN groupadd -r postgres --gid=999 && useradd -r -d /var/lib/postgresql -g postgres --uid=999 postgres
@ -9,26 +9,27 @@ RUN apt-get update && apt-get install -y software-properties-common locales && \
rm -rf /var/lib/apt/lists/* && \
localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8
ENV LANG en_US.utf8
ENV DEBIAN_FRONTEND noninteractive
RUN apt-key adv --keyserver ha.pool.sks-keyservers.net --recv-keys B97B0AFCAA1A47F044F244A07FCC7D46ACCC4CF8
ENV PG_MAJOR 10
RUN echo 'deb http://apt.postgresql.org/pub/repos/apt/ xenial-pgdg main' $PG_MAJOR > /etc/apt/sources.list.d/pgdg.list
RUN add-apt-repository ppa:jonathonf/python-3.6
RUN echo 'deb http://apt.postgresql.org/pub/repos/apt/ bionic-pgdg main' $PG_MAJOR > /etc/apt/sources.list.d/pgdg.list
RUN apt-get update \
&& apt-get install -y postgresql-common \
&& apt-get \
-o Dpkg::Options::="--force-confdef" \
-o Dpkg::Options::="--force-confold" \
install -y postgresql-common \
sudo \
libpq-dev \
python-pip \
python3.5-dev \
python3-pip \
python2.7-dev \
python3.6-dev \
postgresql-$PG_MAJOR \
postgresql-contrib-$PG_MAJOR \
&& pip install tox
&& pip3 install tox
COPY ./ /dist

View File

@ -92,6 +92,7 @@ class ConflictTerminator(threading.Thread):
"""
Kills conflicting pids (only on postgresql > 9.6)
"""
def __init__(self, conn_str, interval):
threading.Thread.__init__(self, name='terminator')
self.daemon = True
@ -131,16 +132,20 @@ class ConflictTerminator(threading.Thread):
for pid in self.pids:
cursor.execute(
'SELECT pid, pg_terminate_backend(pid) FROM '
'unnest(pg_blocking_pids(%s)) AS pid',
(pid,))
'unnest(pg_blocking_pids(%s)) AS pid', (pid, ))
terminated = [x[0] for x in cursor.fetchall()]
for i in terminated:
self.log.info('Terminated conflicting pid: %s', i)
time.sleep(self.interval)
REF_COLUMNS = ['version', 'description', 'type',
'installed_by', 'installed_on']
REF_COLUMNS = [
'version',
'description',
'type',
'installed_by',
'installed_on',
]
def _create_raw_connection(conn_string, logger=LOG):
@ -174,11 +179,11 @@ def _is_initialized(cursor):
"""
Check that database is initialized
"""
cursor.execute('SELECT EXISTS(SELECT 1 FROM '
'information_schema.tables '
'WHERE table_schema = %s '
'AND table_name = %s)',
('public', 'schema_version'))
cursor.execute(
'SELECT EXISTS(SELECT 1 FROM '
'information_schema.tables '
'WHERE table_schema = %s '
'AND table_name = %s)', ('public', 'schema_version'))
table_exists = cursor.fetchone()[0]
if not table_exists:
@ -196,20 +201,17 @@ def _is_initialized(cursor):
return True
MIGRATION_FILE_RE = re.compile(
r'V(?P<version>\d+)__(?P<description>.+)\.sql$',
)
MIGRATION_FILE_RE = re.compile(r'V(?P<version>\d+)__(?P<description>.+)\.sql$')
MigrationInfo = namedtuple('MigrationInfo', ('meta', 'file_path'))
Callbacks = namedtuple('Callbacks', ('beforeAll', 'beforeEach',
'afterEach', 'afterAll'))
Callbacks = namedtuple('Callbacks',
('beforeAll', 'beforeEach', 'afterEach', 'afterAll'))
Config = namedtuple('Config', ('target', 'baseline', 'cursor', 'dryrun',
'callbacks', 'user', 'base_dir', 'conn',
'session', 'conn_instance',
'terminator_instance', 'termination_interval'))
Config = namedtuple('Config',
('target', 'baseline', 'cursor', 'dryrun', 'callbacks',
'user', 'base_dir', 'conn', 'session', 'conn_instance',
'terminator_instance', 'termination_interval'))
CONFIG_IGNORE = ['cursor', 'conn_instance', 'terminator_instance']
@ -236,9 +238,8 @@ def _get_migrations_info_from_dir(base_dir):
for fname, file_path in _get_files_from_dir(path):
match = MIGRATION_FILE_RE.match(fname)
if match is None:
LOG.warning(
'File %s does not match by pattern %s. Skipping it.',
file_path, MIGRATION_FILE_RE.pattern)
LOG.warning('File %s does not match by pattern %s. Skipping it.',
file_path, MIGRATION_FILE_RE.pattern)
continue
version = int(match.group('version'))
ret = dict(
@ -254,13 +255,13 @@ def _get_migrations_info_from_dir(base_dir):
file_path,
)
if version in migrations:
raise MalformedMigration((
'Found migrations with same version: {version} '
'\nfirst : {first_path}'
'\nsecond: {second_path}').format(
version=version,
first_path=migration.file_path,
second_path=migrations[version].file_path))
raise MalformedMigration(
('Found migrations with same version: {version} '
'\nfirst : {first_path}'
'\nsecond: {second_path}').format(
version=version,
first_path=migration.file_path,
second_path=migrations[version].file_path))
migrations[version] = migration
return migrations
@ -274,12 +275,14 @@ def _get_migrations_info(base_dir, baseline_v, target_v):
target = target_v if target_v is not None else float('inf')
for version, ret in _get_migrations_info_from_dir(base_dir).items():
if version > baseline_v and version <= target:
if baseline_v < version <= target:
migrations[version] = ret.meta
else:
LOG.info(
'Ignore migration %r cause baseline: %r or target: %r',
ret, baseline_v, target,
ret,
baseline_v,
target,
)
return migrations
@ -289,9 +292,8 @@ def _get_info(base_dir, baseline_v, target_v, cursor):
Get migrations info from database and base dir
"""
ret = {}
cursor.execute(
'SELECT {columns} FROM public.schema_version'.format(
columns=', '.join(REF_COLUMNS)))
cursor.execute('SELECT {columns} FROM public.schema_version'.format(
columns=', '.join(REF_COLUMNS)))
for i in cursor.fetchall():
version = {}
for j in enumerate(REF_COLUMNS):
@ -333,9 +335,9 @@ def _set_baseline(baseline_v, user, cursor):
"""
Cleanup schema_version and set baseline
"""
cursor.execute('SELECT EXISTS(SELECT 1 FROM public'
'.schema_version WHERE version >= %s::bigint)',
(baseline_v,))
cursor.execute(
'SELECT EXISTS(SELECT 1 FROM public'
'.schema_version WHERE version >= %s::bigint)', (baseline_v, ))
check_failed = cursor.fetchone()[0]
if check_failed:
@ -348,11 +350,11 @@ def _set_baseline(baseline_v, user, cursor):
LOG.info(cursor.statusmessage)
LOG.info('setting baseline')
cursor.execute('INSERT INTO public.schema_version '
'(version, type, description, installed_by) '
'VALUES (%s::bigint, %s, %s, %s)',
(text(baseline_v), 'manual',
'Forced baseline', user))
cursor.execute(
'INSERT INTO public.schema_version '
'(version, type, description, installed_by) '
'VALUES (%s::bigint, %s, %s, %s)',
(text(baseline_v), 'manual', 'Forced baseline', user))
LOG.info(cursor.statusmessage)
@ -361,18 +363,20 @@ def _init_schema(cursor):
Create schema_version table
"""
LOG.info('creating type schema_version_type')
cursor.execute('CREATE TYPE public.schema_version_type '
'AS ENUM (%s, %s)', ('auto', 'manual'))
cursor.execute(
'CREATE TYPE public.schema_version_type '
'AS ENUM (%s, %s)', ('auto', 'manual'))
LOG.info(cursor.statusmessage)
LOG.info('creating table schema_version')
cursor.execute('CREATE TABLE public.schema_version ('
'version BIGINT NOT NULL PRIMARY KEY, '
'description TEXT NOT NULL, '
'type public.schema_version_type NOT NULL '
'DEFAULT %s, '
'installed_by TEXT NOT NULL, '
'installed_on TIMESTAMP WITHOUT time ZONE '
'DEFAULT now() NOT NULL)', ('auto',))
cursor.execute(
'CREATE TABLE public.schema_version ('
'version BIGINT NOT NULL PRIMARY KEY, '
'description TEXT NOT NULL, '
'type public.schema_version_type NOT NULL '
'DEFAULT %s, '
'installed_by TEXT NOT NULL, '
'installed_on TIMESTAMP WITHOUT time ZONE '
'DEFAULT now() NOT NULL)', ('auto', ))
LOG.info(cursor.statusmessage)
@ -387,8 +391,7 @@ def _get_statements(path):
data.encode('ascii')
except UnicodeError as exc:
raise MalformedStatement(
'Non ascii symbols in file: {0}, {1}'.format(
path, text(exc)))
'Non ascii symbols in file: {0}, {1}'.format(path, text(exc)))
data = sqlparse.format(data, strip_comments=True)
for statement in sqlparse.parsestream(data, encoding='utf-8'):
st_str = text(statement).strip().encode('utf-8')
@ -431,12 +434,11 @@ def _apply_version(version, base_dir, user, cursor):
LOG.info('Try apply version %r', version_info)
_apply_file(version_info.file_path, cursor)
cursor.execute('INSERT INTO public.schema_version '
'(version, description, installed_by) '
'VALUES (%s::bigint, %s, %s)',
(text(version),
version_info.meta['description'],
user))
cursor.execute(
'INSERT INTO public.schema_version '
'(version, description, installed_by) '
'VALUES (%s::bigint, %s, %s)',
(text(version), version_info.meta['description'], user))
def _parse_str_callbacks(callbacks, ret, base_dir):
@ -486,10 +488,7 @@ def _get_callbacks(callbacks, base_dir=''):
"""
Parse cmdline/config callbacks
"""
ret = Callbacks(beforeAll=[],
beforeEach=[],
afterEach=[],
afterAll=[])
ret = Callbacks(beforeAll=[], beforeEach=[], afterEach=[], afterAll=[])
if isinstance(callbacks, dict):
return _parse_dict_callbacks(callbacks, ret, base_dir)
return _parse_str_callbacks(callbacks, ret, base_dir)
@ -551,8 +550,8 @@ def info(config, stdout=True):
"""
Info cmdline wrapper
"""
state = _get_state(config.base_dir, config.baseline,
config.target, config.cursor)
state = _get_state(config.base_dir, config.baseline, config.target,
config.cursor)
if stdout:
out_state = OrderedDict()
for version in sorted(state, key=int):
@ -592,20 +591,22 @@ def baseline(config):
def _prepare_nontransactional_steps(state, callbacks):
steps = []
i = {'state': {},
'cbs': _get_callbacks('')}
i = {'state': {}, 'cbs': _get_callbacks('')}
for version in sorted(state):
if not state[version]['transactional']:
if i['state']:
steps.append(i)
i = {'state': {},
'cbs': _get_callbacks('')}
i = {'state': {}, 'cbs': _get_callbacks('')}
elif not steps:
LOG.error('First migration MUST be transactional')
raise MalformedMigration('First migration MUST '
'be transactional')
steps.append({'state': {version: state[version]},
'cbs': _get_callbacks('')})
steps.append({
'state': {
version: state[version],
},
'cbs': _get_callbacks(''),
})
else:
i['state'][version] = state[version]
i['cbs'] = callbacks
@ -641,8 +642,8 @@ def _execute_mixed_steps(config, steps, nt_conn):
else:
cur = config.cursor
commit_req = True
_migrate_step(step['state'], step['cbs'],
config.base_dir, config.user, cur)
_migrate_step(step['state'], step['cbs'], config.base_dir, config.user,
cur)
def migrate(config):
@ -654,15 +655,14 @@ def migrate(config):
'use latest available version)')
raise MigrateError('Unknown target')
state = _get_state(config.base_dir, config.baseline,
config.target, config.cursor)
state = _get_state(config.base_dir, config.baseline, config.target,
config.cursor)
not_applied = [x for x in state if state[x]['installed_on'] is None]
non_trans = [x for x in not_applied if not state[x]['transactional']]
if non_trans:
if config.dryrun:
LOG.error('Dry run for nontransactional migrations '
'is nonsence')
LOG.error('Dry run for nontransactional migrations ' 'is nonsence')
raise MigrateError('Dry run for nontransactional migrations '
'is nonsence')
if len(state) != len(not_applied):
@ -675,8 +675,8 @@ def migrate(config):
with closing(_create_connection(config)) as nt_conn:
nt_conn.autocommit = True
cursor = _init_cursor(nt_conn, config.session)
_migrate_step(state, _get_callbacks(''),
config.base_dir, config.user, cursor)
_migrate_step(state, _get_callbacks(''), config.base_dir,
config.user, cursor)
if config.terminator_instance:
config.terminator_instance.remove_conn(nt_conn)
else:
@ -690,8 +690,8 @@ def migrate(config):
if config.terminator_instance:
config.terminator_instance.remove_conn(nt_conn)
else:
_migrate_step(state, config.callbacks, config.base_dir,
config.user, config.cursor)
_migrate_step(state, config.callbacks, config.base_dir, config.user,
config.cursor)
_finish(config)
@ -703,14 +703,20 @@ COMMANDS = {
'migrate': migrate,
}
CONFIG_DEFAULTS = Config(target=None, baseline=0, cursor=None, dryrun=False,
callbacks='', base_dir='', user=None,
session=['SET lock_timeout = 0'],
conn='dbname=postgres user=postgres '
'connect_timeout=1',
conn_instance=None,
terminator_instance=None,
termination_interval=None)
CONFIG_DEFAULTS = Config(
target=None,
baseline=0,
cursor=None,
dryrun=False,
callbacks='',
base_dir='',
user=None,
session=['SET lock_timeout = 0'],
conn='dbname=postgres user=postgres '
'connect_timeout=1',
conn_instance=None,
terminator_instance=None,
termination_interval=None)
def get_config(base_dir, args=None):
@ -741,14 +747,14 @@ def get_config(base_dir, args=None):
if conf.termination_interval and not conf.dryrun:
conf = conf._replace(
terminator_instance=ConflictTerminator(
conf.conn, conf.termination_interval))
terminator_instance=ConflictTerminator(conf.conn,
conf.termination_interval))
conf.terminator_instance.start()
conf = conf._replace(conn_instance=_create_connection(conf))
conf = conf._replace(cursor=_init_cursor(conf.conn_instance, conf.session))
conf = conf._replace(callbacks=_get_callbacks(conf.callbacks,
conf.base_dir))
conf = conf._replace(
callbacks=_get_callbacks(conf.callbacks, conf.base_dir))
if conf.user is None:
conf = conf._replace(user=_get_database_user(conf.cursor))
@ -764,47 +770,46 @@ def _main():
"""
parser = argparse.ArgumentParser()
parser.add_argument('cmd',
choices=COMMANDS.keys(),
type=str,
help='Operation')
parser.add_argument('-t', '--target',
type=str,
help='Target version')
parser.add_argument('-c', '--conn',
type=str,
help='Postgresql connection string')
parser.add_argument('-d', '--base_dir',
type=str,
default='',
help='Migrations base dir')
parser.add_argument('-u', '--user',
type=str,
help='Override database user in migration info')
parser.add_argument('-b', '--baseline',
type=int,
help='Baseline version')
parser.add_argument('-a', '--callbacks',
type=str,
help='Comma-separated list of callbacks '
'(type:dir/file)')
parser.add_argument('-s', '--session',
action='append',
help='Session setup (e.g. isolation level)')
parser.add_argument('-n', '--dryrun',
action='store_true',
help='Say "rollback" in the end instead of "commit"')
parser.add_argument('-l', '--termination_interval',
type=float,
help='Inverval for terminating blocking pids')
parser.add_argument('-v', '--verbose',
default=0,
action='count',
help='Be verbose')
parser.add_argument(
'cmd', choices=COMMANDS.keys(), type=str, help='Operation')
parser.add_argument('-t', '--target', type=str, help='Target version')
parser.add_argument(
'-c', '--conn', type=str, help='Postgresql connection string')
parser.add_argument(
'-d', '--base_dir', type=str, default='', help='Migrations base dir')
parser.add_argument(
'-u',
'--user',
type=str,
help='Override database user in migration info')
parser.add_argument('-b', '--baseline', type=int, help='Baseline version')
parser.add_argument(
'-a',
'--callbacks',
type=str,
help='Comma-separated list of callbacks '
'(type:dir/file)')
parser.add_argument(
'-s',
'--session',
action='append',
help='Session setup (e.g. isolation level)')
parser.add_argument(
'-n',
'--dryrun',
action='store_true',
help='Say "rollback" in the end instead of "commit"')
parser.add_argument(
'-l',
'--termination_interval',
type=float,
help='Inverval for terminating blocking pids')
parser.add_argument(
'-v', '--verbose', default=0, action='count', help='Be verbose')
args = parser.parse_args()
logging.basicConfig(
level=(logging.ERROR - 10*(min(3, args.verbose))),
level=(logging.ERROR - 10 * (min(3, args.verbose))),
format='%(asctime)s %(levelname)-8s: %(message)s')
config = get_config(args.base_dir, args)

18
tox.ini
View File

@ -4,7 +4,7 @@
# and then run "tox" from this directory.
[tox]
envlist = py27, py35, py36, flake8, pylint
envlist = py27, py36, flake8, pylint, yapf
[testenv:py27]
whitelist_externals = rm
@ -19,18 +19,6 @@ deps = behave
coverage
func_timeout
[testenv:py35]
whitelist_externals = rm
commands = rm -rf htmlcov
coverage erase
coverage run -p --include=pgmigrate.py {envbindir}/behave -q
coverage combine
coverage html pgmigrate.py
coverage report --fail-under=100 pgmigrate.py
deps = behave
coverage
func_timeout
[testenv:py36]
whitelist_externals = rm
commands = rm -rf htmlcov
@ -58,6 +46,10 @@ deps = flake8
commands = pylint pgmigrate.py
deps = pylint
[testenv:yapf]
commands = yapf -pd pgmigrate.py
deps = yapf==0.22.0
[flake8]
copyright-check = True
copyright-regexp = Copyright\s+(\(C\)\s+)?(\d{4}-)?2016-2018\s+%(author)s