2015-01-08 00:30:20 +03:00
|
|
|
import pytest
|
2015-02-09 01:54:57 +03:00
|
|
|
import psycopg2
|
2015-03-01 04:54:03 +03:00
|
|
|
import psycopg2.extras
|
2015-01-08 01:17:55 +03:00
|
|
|
from pgcli.main import format_output
|
2015-03-01 04:54:03 +03:00
|
|
|
from pgcli.pgexecute import register_json_typecasters
|
2015-01-08 00:30:20 +03:00
|
|
|
|
|
|
|
# TODO: should this be somehow be divined from environment?
|
|
|
|
POSTGRES_USER, POSTGRES_HOST = 'postgres', 'localhost'
|
|
|
|
|
|
|
|
|
|
|
|
def db_connection(dbname=None):
|
2015-05-14 18:40:27 +03:00
|
|
|
conn = psycopg2.connect(user=POSTGRES_USER, host=POSTGRES_HOST, database=dbname)
|
2015-01-08 00:30:20 +03:00
|
|
|
conn.autocommit = True
|
|
|
|
return conn
|
|
|
|
|
2015-03-01 04:54:03 +03:00
|
|
|
|
2015-01-08 00:30:20 +03:00
|
|
|
try:
|
2015-03-01 04:54:03 +03:00
|
|
|
conn = db_connection()
|
2015-01-08 00:30:20 +03:00
|
|
|
CAN_CONNECT_TO_DB = True
|
2015-03-01 04:54:03 +03:00
|
|
|
SERVER_VERSION = conn.server_version
|
|
|
|
json_types = register_json_typecasters(conn, lambda x: x)
|
|
|
|
JSON_AVAILABLE = 'json' in json_types
|
|
|
|
JSONB_AVAILABLE = 'jsonb' in json_types
|
2015-01-08 00:30:20 +03:00
|
|
|
except:
|
2015-03-01 04:54:03 +03:00
|
|
|
CAN_CONNECT_TO_DB = JSON_AVAILABLE = JSONB_AVAILABLE = False
|
|
|
|
SERVER_VERSION = 0
|
|
|
|
|
2015-01-08 00:30:20 +03:00
|
|
|
|
|
|
|
dbtest = pytest.mark.skipif(
|
|
|
|
not CAN_CONNECT_TO_DB,
|
|
|
|
reason="Need a postgres instance at localhost accessible by user 'postgres'")
|
|
|
|
|
|
|
|
|
2015-03-01 04:54:03 +03:00
|
|
|
requires_json = pytest.mark.skipif(
|
|
|
|
not JSON_AVAILABLE,
|
|
|
|
reason='Postgres server unavailable or json type not defined')
|
|
|
|
|
|
|
|
|
|
|
|
requires_jsonb = pytest.mark.skipif(
|
|
|
|
not JSONB_AVAILABLE,
|
|
|
|
reason='Postgres server unavailable or jsonb type not defined')
|
|
|
|
|
|
|
|
|
2015-01-08 00:30:20 +03:00
|
|
|
def create_db(dbname):
|
|
|
|
with db_connection().cursor() as cur:
|
|
|
|
try:
|
|
|
|
cur.execute('''CREATE DATABASE _test_db''')
|
|
|
|
except:
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
def drop_tables(conn):
|
|
|
|
with conn.cursor() as cur:
|
2015-01-19 02:27:25 +03:00
|
|
|
cur.execute('''
|
|
|
|
DROP SCHEMA public CASCADE;
|
|
|
|
CREATE SCHEMA public;
|
|
|
|
DROP SCHEMA IF EXISTS schema1 CASCADE;
|
|
|
|
DROP SCHEMA IF EXISTS schema2 CASCADE''')
|
2015-01-08 01:17:55 +03:00
|
|
|
|
|
|
|
|
2015-06-30 14:35:53 +03:00
|
|
|
def run(executor, sql, join=False, expanded=False, pgspecial=None):
|
2015-01-08 01:17:55 +03:00
|
|
|
" Return string output for the sql to be run "
|
2015-01-08 12:58:02 +03:00
|
|
|
result = []
|
2015-06-30 14:35:53 +03:00
|
|
|
for title, rows, headers, status in executor.run(sql, pgspecial):
|
2015-06-25 14:03:50 +03:00
|
|
|
result.extend(format_output(title, rows, headers, status, 'psql',
|
|
|
|
expanded=expanded))
|
2015-01-08 12:58:02 +03:00
|
|
|
if join:
|
|
|
|
result = '\n'.join(result)
|
|
|
|
return result
|