import pytest import re import sqlalchemy import threading import time from typing import Any, Optional import urllib.parse import zlib from context import HGECtx class Backend: def __init__(self, name: str, engine: sqlalchemy.engine.Engine): self.name = name self.url = str(engine.url) self.engine = engine self.kind = 'postgres' def __repr__(self): return f'{self.__class__.__name__}(name={self.name!r}, url={self.url!r})' @property def connection_parameters(self): parsed = urllib.parse.urlparse(self.url) return { "host": parsed.hostname, "username": parsed.username, "port": parsed.port, "database": re.sub('^/', '', parsed.path), } # Switch to a new schema using the same connection details as the given engine. def switch_schema(engine: sqlalchemy.engine.Engine, new_schema: str): parsed_url = urllib.parse.urlparse(str(engine.url)) new_url = urllib.parse.urlunparse(parsed_url._replace(path = f'/{new_schema}')) return sqlalchemy.engine.create_engine(new_url) # Acquire a single SQLAlchemy engine for the entire session. def owner_engine(request: pytest.FixtureRequest) -> sqlalchemy.engine.Engine: pg_url: str = request.config.getoption('--pg-urls')[0] # type: ignore return sqlalchemy.engine.create_engine(pg_url) # Create a new user to run the tests. This user is *not* a superuser. # Otherwise we cannot remove database permissions from HGE. def runner_engine(owner_engine: sqlalchemy.engine.Engine) -> sqlalchemy.engine.Engine: user_name = 'hasura_server_tests_py' with owner_engine.connect() as connection: # Effectively, this emulates `CREATE USER IF NOT EXISTS`. try: connection.execute(f"CREATE USER {user_name} WITH PASSWORD 'password'") except: pass parsed_pg_url = urllib.parse.urlparse(str(owner_engine.url)) # Create a new URL with the new username, and the database set to "postgres". # This is fine for connecting; you can't do anything with it. test_pg_url = urllib.parse.urlunparse(parsed_pg_url._replace(netloc = re.sub('^.+@', f'{user_name}:password@', parsed_pg_url.netloc), path = '/postgres')) return sqlalchemy.engine.create_engine(test_pg_url) def create_schema( request: pytest.FixtureRequest, owner_engine: sqlalchemy.engine.Engine, runner_engine: sqlalchemy.engine.Engine, prefix: str, ) -> Backend: name = request.node.name.lower().replace('.', '_') schema_name = f'test_{prefix}_{name}' # PostgreSQL truncates database names to 63 characters. if len(schema_name) >= 64: # use a quick, short, insecure hash hash = zlib.adler32(name.encode('ascii')) schema_name = f'test_{prefix}_{hash:x}' drop_schema(owner_engine, schema_name) with owner_engine.connect() as connection: connection.execute('COMMIT') # Exit the implicit transaction. connection.execute(f'CREATE DATABASE {schema_name}') request.addfinalizer(lambda: drop_schema_in_background(owner_engine, schema_name)) with switch_schema(owner_engine, schema_name).connect() as connection: connection.execute(f'GRANT ALL PRIVILEGES ON DATABASE {schema_name} TO {runner_engine.url.username}') connection.execute(f'GRANT ALL PRIVILEGES ON SCHEMA public TO {runner_engine.url.username}') connection.execute('CREATE SCHEMA hge_tests') connection.execute(f'GRANT ALL PRIVILEGES ON SCHEMA hge_tests TO {runner_engine.url.username}') engine = switch_schema(runner_engine, schema_name) return Backend(name = schema_name, engine = engine) # Dropping the database can be tricky because we cannot guarantee fixture shutdown order. # Perhaps the GraphQL Engine is still using it. # To avoid this, we try a few times in a background thread, allowing other cleanup to continue. def drop_schema_in_background(engine: sqlalchemy.engine.Engine, name: str): t = threading.Thread(target = lambda: retry(f'DROP DATABASE {name}', lambda: drop_schema(engine, name), tries = 3)) t.start() def drop_schema(engine: sqlalchemy.engine.Engine, name: str): with engine.connect() as connection: connection.execute('COMMIT') # Exit the implicit transaction. connection.execute(f'DROP DATABASE IF EXISTS {name}') def retry(message, f, tries = 1, delay = 3): try: print(f'Attempting to {message}...') f() except: if tries == 1: print(f'Failed to {message}. Giving up.') else: print(f'Failed to {message}. Waiting {delay} seconds then trying again.') time.sleep(delay) retry(message, f, tries = tries - 1) def metadata_schema_url( request: pytest.FixtureRequest, owner_engine: sqlalchemy.engine.Engine, runner_engine: sqlalchemy.engine.Engine, ): return create_schema(request, owner_engine, runner_engine, 'metadata').url def new_source( request: pytest.FixtureRequest, owner_engine: sqlalchemy.engine.Engine, runner_engine: sqlalchemy.engine.Engine, hge_ctx: HGECtx, name: Optional[str] = None, customization: Any = {}, ): source_name = name or 'default' prefix = f'source_{source_name}' backend = create_schema(request, owner_engine, runner_engine, prefix) add_source(hge_ctx, backend, source_name, customization) yield backend drop_source(hge_ctx, source_name) def add_source(hge_ctx: HGECtx, backend: Backend, source_name: str, customization: Any = {}): metadata = hge_ctx.v1metadataq({ 'type': 'export_metadata', 'args': {}, }) metadata['sources'].append({ 'name': source_name or 'default', 'kind': backend.kind, 'configuration': { 'connection_info': { 'database_url': backend.url, }, }, 'customization': customization, 'tables': [], }) hge_ctx.v1metadataq({ 'type': 'replace_metadata', 'args': metadata, }) def drop_source(hge_ctx: HGECtx, source_name: str): metadata = hge_ctx.v1metadataq({ 'type': 'export_metadata', 'args': {}, }) metadata['sources'] = [source for source in metadata['sources'] if source['name'] != source_name] hge_ctx.v1metadataq({ 'type': 'replace_metadata', 'args': metadata, }) def source_backend( request: pytest.FixtureRequest, owner_engine: sqlalchemy.engine.Engine, runner_engine: sqlalchemy.engine.Engine, hge_ctx: HGECtx, ): disabled_marker = request.node.get_closest_marker('default_source_disabled') if disabled_marker: yield None else: yield from new_source(request, owner_engine, runner_engine, hge_ctx) def postgis(owner_engine: sqlalchemy.engine.Engine, source_backend: Optional[Backend]): # TODO: remove once parallelization work is completed # `source_backend` will no longer be optional engine = switch_schema(owner_engine, source_backend.name) if source_backend else owner_engine with engine.connect() as connection: connection.execute('CREATE EXTENSION IF NOT EXISTS postgis') connection.execute('CREATE EXTENSION IF NOT EXISTS postgis_topology') result = connection.execute('SELECT PostGIS_lib_version() as postgis_version').fetchone() if not result: raise Exception('Could not detect the PostGIS version.') postgis_version: str = result['postgis_version'] if re.match('^3\\.', postgis_version): connection.execute('CREATE EXTENSION IF NOT EXISTS postgis_raster')