mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-15 17:31:56 +03:00
7c452bfca1
When running using the "new" style (with a HGE binary, not a URL), a new PostgreSQL metadata and source database are created for each test. When we get this into CI, this should drastically reduce the flakiness. I have also enabled parallelization by default when using `run-new.sh`. It's much faster. I had to basically rewrite _server/tests-py/test_graphql_read_only_source.py_ so that it does two different things depending on how it's run. It's unfortunate, but it should eventually go away. PR-URL: https://github.com/hasura/graphql-engine-mono/pull/6879 GitOrigin-RevId: a121b9035f8da3e61a3e36d8b1fbc6ccae918fad
219 lines
7.8 KiB
Python
219 lines
7.8 KiB
Python
import pytest
|
|
import re
|
|
import sqlalchemy
|
|
import threading
|
|
import time
|
|
from typing import Any, Optional
|
|
import urllib.parse
|
|
import zlib
|
|
|
|
from context import HGECtx
|
|
|
|
|
|
class Backend:
|
|
def __init__(self, name: str, engine: sqlalchemy.engine.Engine):
|
|
self.name = name
|
|
self.url = str(engine.url)
|
|
self.engine = engine
|
|
self.kind = 'postgres'
|
|
|
|
def __repr__(self):
|
|
return f'{self.__class__.__name__}(name={self.name!r}, url={self.url!r})'
|
|
|
|
@property
|
|
def connection_parameters(self):
|
|
parsed = urllib.parse.urlparse(self.url)
|
|
return {
|
|
"host": parsed.hostname,
|
|
"username": parsed.username,
|
|
"port": parsed.port,
|
|
"database": re.sub('^/', '', parsed.path),
|
|
}
|
|
|
|
|
|
# Switch to a new schema using the same connection details as the given engine.
|
|
def switch_schema(engine: sqlalchemy.engine.Engine, new_schema: str):
|
|
parsed_url = urllib.parse.urlparse(str(engine.url))
|
|
new_url = urllib.parse.urlunparse(parsed_url._replace(path = f'/{new_schema}'))
|
|
return sqlalchemy.engine.create_engine(new_url)
|
|
|
|
|
|
# Acquire a single SQLAlchemy engine for the entire session.
|
|
def owner_engine(request: pytest.FixtureRequest) -> sqlalchemy.engine.Engine:
|
|
pg_url: str = request.config.getoption('--pg-urls')[0] # type: ignore
|
|
return sqlalchemy.engine.create_engine(pg_url)
|
|
|
|
|
|
# Create a new user to run the tests. This user is *not* a superuser.
|
|
# Otherwise we cannot remove database permissions from HGE.
|
|
def runner_engine(owner_engine: sqlalchemy.engine.Engine) -> sqlalchemy.engine.Engine:
|
|
user_name = 'hasura_server_tests_py'
|
|
|
|
with owner_engine.connect() as connection:
|
|
# Effectively, this emulates `CREATE USER IF NOT EXISTS`.
|
|
try:
|
|
connection.execute(f"CREATE USER {user_name} WITH PASSWORD 'password'")
|
|
except:
|
|
pass
|
|
|
|
parsed_pg_url = urllib.parse.urlparse(str(owner_engine.url))
|
|
# Create a new URL with the new username, and the database set to "postgres".
|
|
# This is fine for connecting; you can't do anything with it.
|
|
test_pg_url = urllib.parse.urlunparse(parsed_pg_url._replace(netloc = re.sub('^.+@', f'{user_name}:password@', parsed_pg_url.netloc), path = '/postgres'))
|
|
|
|
return sqlalchemy.engine.create_engine(test_pg_url)
|
|
|
|
|
|
def create_schema(
|
|
request: pytest.FixtureRequest,
|
|
owner_engine: sqlalchemy.engine.Engine,
|
|
runner_engine: sqlalchemy.engine.Engine,
|
|
prefix: str,
|
|
) -> Backend:
|
|
name = request.node.name.lower().replace('.', '_')
|
|
schema_name = f'test_{prefix}_{name}'
|
|
# PostgreSQL truncates database names to 63 characters.
|
|
if len(schema_name) >= 64:
|
|
# use a quick, short, insecure hash
|
|
hash = zlib.adler32(name.encode('ascii'))
|
|
schema_name = f'test_{prefix}_{hash:x}'
|
|
|
|
drop_schema(owner_engine, schema_name)
|
|
|
|
with owner_engine.connect() as connection:
|
|
connection.execute('COMMIT') # Exit the implicit transaction.
|
|
connection.execute(f'CREATE DATABASE {schema_name}')
|
|
request.addfinalizer(lambda: drop_schema_in_background(owner_engine, schema_name))
|
|
|
|
with switch_schema(owner_engine, schema_name).connect() as connection:
|
|
connection.execute(f'GRANT ALL PRIVILEGES ON DATABASE {schema_name} TO {runner_engine.url.username}')
|
|
connection.execute(f'GRANT ALL PRIVILEGES ON SCHEMA public TO {runner_engine.url.username}')
|
|
connection.execute('CREATE SCHEMA hge_tests')
|
|
connection.execute(f'GRANT ALL PRIVILEGES ON SCHEMA hge_tests TO {runner_engine.url.username}')
|
|
|
|
engine = switch_schema(runner_engine, schema_name)
|
|
return Backend(name = schema_name, engine = engine)
|
|
|
|
|
|
# Dropping the database can be tricky because we cannot guarantee fixture shutdown order.
|
|
# Perhaps the GraphQL Engine is still using it.
|
|
# To avoid this, we try a few times in a background thread, allowing other cleanup to continue.
|
|
def drop_schema_in_background(engine: sqlalchemy.engine.Engine, name: str):
|
|
t = threading.Thread(target = lambda: retry(f'DROP DATABASE {name}', lambda: drop_schema(engine, name), tries = 3))
|
|
t.start()
|
|
|
|
|
|
def drop_schema(engine: sqlalchemy.engine.Engine, name: str):
|
|
with engine.connect() as connection:
|
|
connection.execute('COMMIT') # Exit the implicit transaction.
|
|
connection.execute(f'DROP DATABASE IF EXISTS {name}')
|
|
|
|
|
|
def retry(message, f, tries = 1, delay = 3):
|
|
try:
|
|
print(f'Attempting to {message}...')
|
|
f()
|
|
except:
|
|
if tries == 1:
|
|
print(f'Failed to {message}. Giving up.')
|
|
else:
|
|
print(f'Failed to {message}. Waiting {delay} seconds then trying again.')
|
|
time.sleep(delay)
|
|
retry(message, f, tries = tries - 1)
|
|
|
|
|
|
def metadata_schema_url(
|
|
request: pytest.FixtureRequest,
|
|
owner_engine: sqlalchemy.engine.Engine,
|
|
runner_engine: sqlalchemy.engine.Engine,
|
|
):
|
|
return create_schema(request, owner_engine, runner_engine, 'metadata').url
|
|
|
|
|
|
def new_source(
|
|
request: pytest.FixtureRequest,
|
|
owner_engine: sqlalchemy.engine.Engine,
|
|
runner_engine: sqlalchemy.engine.Engine,
|
|
hge_ctx: HGECtx,
|
|
name: Optional[str] = None,
|
|
customization: Any = {},
|
|
):
|
|
source_name = name or 'default'
|
|
prefix = f'source_{source_name}'
|
|
backend = create_schema(request, owner_engine, runner_engine, prefix)
|
|
add_source(hge_ctx, backend, source_name, customization)
|
|
|
|
yield backend
|
|
|
|
drop_source(hge_ctx, source_name)
|
|
|
|
|
|
def add_source(hge_ctx: HGECtx, backend: Backend, source_name: str, customization: Any = {}):
|
|
metadata = hge_ctx.v1metadataq({
|
|
'type': 'export_metadata',
|
|
'args': {},
|
|
})
|
|
metadata['sources'].append({
|
|
'name': source_name or 'default',
|
|
'kind': backend.kind,
|
|
'configuration': {
|
|
'connection_info': {
|
|
'database_url': backend.url,
|
|
},
|
|
},
|
|
'customization': customization,
|
|
'tables': [],
|
|
})
|
|
hge_ctx.v1metadataq({
|
|
'type': 'replace_metadata',
|
|
'args': metadata,
|
|
})
|
|
|
|
|
|
def drop_source(hge_ctx: HGECtx, source_name: str):
|
|
metadata = hge_ctx.v1metadataq({
|
|
'type': 'export_metadata',
|
|
'args': {},
|
|
})
|
|
metadata['sources'] = [source for source in metadata['sources'] if source['name'] != source_name]
|
|
hge_ctx.v1metadataq({
|
|
'type': 'replace_metadata',
|
|
'args': metadata,
|
|
})
|
|
|
|
|
|
def source_backend(
|
|
request: pytest.FixtureRequest,
|
|
owner_engine: sqlalchemy.engine.Engine,
|
|
runner_engine: sqlalchemy.engine.Engine,
|
|
hge_ctx: HGECtx,
|
|
):
|
|
disabled_marker = request.node.get_closest_marker('default_source_disabled')
|
|
if disabled_marker:
|
|
yield None
|
|
else:
|
|
yield from new_source(request, owner_engine, runner_engine, hge_ctx)
|
|
|
|
|
|
def version(engine: sqlalchemy.engine.Engine) -> int:
|
|
with engine.connect() as connection:
|
|
row = connection.execute('show server_version_num').fetchone()
|
|
if not row:
|
|
raise Exception('Could not get the PostgreSQL version.')
|
|
return int(row['server_version_num']) // 10000
|
|
|
|
|
|
def postgis(owner_engine: sqlalchemy.engine.Engine, source_backend: Optional[Backend]):
|
|
# TODO: remove once parallelization work is completed
|
|
# `source_backend` will no longer be optional
|
|
engine = switch_schema(owner_engine, source_backend.name) if source_backend else owner_engine
|
|
with engine.connect() as connection:
|
|
connection.execute('CREATE EXTENSION IF NOT EXISTS postgis')
|
|
connection.execute('CREATE EXTENSION IF NOT EXISTS postgis_topology')
|
|
result = connection.execute('SELECT PostGIS_lib_version() as postgis_version').fetchone()
|
|
if not result:
|
|
raise Exception('Could not detect the PostGIS version.')
|
|
postgis_version: str = result['postgis_version']
|
|
if re.match('^3\\.', postgis_version):
|
|
connection.execute('CREATE EXTENSION IF NOT EXISTS postgis_raster')
|