graphql-engine/server/bench-wrk/run_hge.py
Samir Talwar 987b55f981 server/tests-py: Reduce the number of locations we check the status code.
We have a lot of `assert st_code == 200` scattered about. This is a
problem because (a) it makes the code harder to parse and (b) the error
message is lacking; I have seen a few flaky tests which were impossible
to diagnose because I didn't know what the response _should_ be.

This reduces the number of places in which we perform this assertion
(moving most of them to `HGECtx.execute_query`), so that we can have a
better chance of seeing a useful error message on test failure.

PR-URL: https://github.com/hasura/graphql-engine-mono/pull/4957
GitOrigin-RevId: 3ff388bccf49f96569aa6b7db85266a0c5ee27ea
2022-07-05 18:01:07 +00:00

465 lines
16 KiB
Python

import os
import subprocess
import argparse
import json
import signal
import time
import contextlib
import requests
import inflection
import docker
from colorama import Fore, Style
def rm_file_if_exists(f):
"""Remove a file if it exists"""
with contextlib.suppress(FileNotFoundError):
os.remove(f)
class HGEError(Exception):
"""Exception type for class HGE"""
class HGE:
default_graphql_env = {
'HASURA_GRAPHQL_ENABLE_TELEMETRY': 'false',
'EVENT_WEBHOOK_HEADER': "MyEnvValue",
'HASURA_GRAPHQL_STRINGIFY_NUMERIC_TYPES': 'true',
'HASURA_GRAPHQL_CONSOLE_ASSETS_DIR' : '../../console/static/dist/',
'HASURA_GRAPHQL_ENABLE_CONSOLE' : 'true'
}
def __init__(self, pg, port_allocator, docker_image=None, log_file='hge.log', url=None, args=[]):
self.pg = pg
self.log_file = log_file
if self.log_file:
self.tix_file = self.log_file[:-4] + '.tix'
self.docker_image = docker_image
self.introspection = None
self.obj_fk_rels = set()
self.arr_fk_rels = set()
self.port_allocator = port_allocator
self.url = url
self.proc = None
self.container = None
self.args = args
def admin_secret(self):
admin_secret_env = os.environ.get('HASURA_GRAPHQL_ADMIN_SECRET')
parser = argparse.ArgumentParser()
parser.add_argument('--admin-secret', metavar='HASURA_GRAPHQL_ADMIN_SECRET', required=False)
admin_secret_arg = parser.parse_known_args(self.args)[0].admin_secret
return admin_secret_arg or admin_secret_env
@classmethod
def do_stack_build(cls):
print(Fore.YELLOW + "Performing Stack build first" + Style.RESET_ALL)
# 'stack run' below will also build, but we want to make sure that's a
# noop so the server starts right away
subprocess.check_call( ['cabal', 'new-build', 'exe:graphql-engine'])
def get_hge_env(self):
hge_env = {
**os.environ,
**self.default_graphql_env.copy(),
'HASURA_GRAPHQL_DATABASE_URL': self.pg.url,
'HASURA_GRAPHQL_SERVER_PORT': str(self.port),
'HASURA_GRAPHQL_SERVER_HOST': '127.0.0.1',
'HPCTIXFILE' : self.tix_file
}
return hge_env
def run(self):
if self.url:
return
if self.docker_image:
self.run_with_docker()
else:
self.run_with_cabal()
def run_with_docker(self):
if self.url:
return
self.port = self.port_allocator.allocate_port(8080)
hge_env = self.get_hge_env()
process_args = ['graphql-engine', 'serve', *self.args]
docker_ports = {str(self.port) + '/tcp': ('127.0.0.1', self.port)}
self.docker_client = docker.from_env()
print("Running GraphQL Engine docker with image:",
self.docker_image, '(port:{})'.format(self.port))
print(process_args)
self.container = self.docker_client.containers.run(
self.docker_image,
command=process_args,
detach=True,
ports=docker_ports,
environment=hge_env,
network_mode='host',
volumes={}
)
self.url = 'http://127.0.0.1:' + str(self.port)
print("Waiting for GraphQL Engine to be running.", end='')
self.wait_for_start()
def run_with_cabal(self):
if self.url:
return
self.port = self.port_allocator.allocate_port(8080)
rm_file_if_exists(self.tix_file)
hge_env = self.get_hge_env()
process_args = ['cabal', 'new-run', '--', 'exe:graphql-engine', 'serve', *self.args]
print("Running GraphQL with 'cabal run': (port:{})".format(self.port))
print(process_args)
self.log_fp = open(self.log_file, 'w')
self.proc = subprocess.Popen(
process_args,
env=hge_env,
shell=False,
bufsize=-1,
start_new_session=True,
stdout=self.log_fp,
stderr=subprocess.STDOUT
)
self.url = 'http://127.0.0.1:' + str(self.port)
print("Waiting for GraphQL Engine to be running.", end='')
self.wait_for_start()
def check_if_process_is_running(self):
if self.proc.poll() is not None:
with open(self.log_file) as fr:
raise HGEError(
"GraphQL engine failed with error: " + fr.read())
def check_if_container_is_running(self):
self.container.reload()
if self.container.status == 'exited':
raise HGEError(
"GraphQL engine failed with error: \n" +
self.container.logs(stdout=True, stderr=True).decode('ascii')
)
def wait_for_start(self, timeout=120):
if timeout <= 0:
raise HGEError("Timeout waiting for graphql process to start")
if self.proc:
self.check_if_process_is_running()
elif self.container:
self.check_if_container_is_running()
try:
q = { 'query': 'query { __typename }' }
r = requests.post(self.url + '/v1/graphql',json.dumps(q),headers=self.admin_auth_headers())
if r.status_code == 200:
print()
return
except requests.exceptions.ConnectionError:
pass
except ConnectionError:
pass
print(".", end="", flush=True),
sleep_time = 0.5
time.sleep(sleep_time)
self.wait_for_start(timeout - sleep_time)
def teardown(self):
if getattr(self, 'log_fp', None):
self.log_fp.close()
self.log_fp = None
if self.proc:
self.cleanup_process()
elif self.container:
self.cleanup_docker()
def cleanup_process(self):
# TODO hangs
print(Fore.YELLOW + "Stopping graphql engine at port:", self.port, Style.RESET_ALL)
pgrp = os.getpgid(self.proc.pid)
os.killpg(pgrp, signal.SIGTERM)
# NOTE this doesn't seem to work, although a SIGINT from terminal does ...
# self.proc.send_signal(signal.SIGINT)
self.proc.wait()
self.proc = None
def cleanup_docker(self):
cntnr_info = "HGE docker container " + self.container.name + " " + repr(self.container.image)
print(Fore.YELLOW + "Stopping " + cntnr_info + Style.RESET_ALL)
self.container.stop()
print(Fore.YELLOW + "Removing " + cntnr_info + Style.RESET_ALL)
self.container.remove()
self.container = None
def admin_auth_headers(self):
headers = {}
if self.admin_secret():
headers['X-Hasura-Admin-Secret'] = self.admin_secret()
return headers
def v1q(self, q, exp_status=200):
resp = requests.post(self.url + '/v1/query', json.dumps(q), headers=self.admin_auth_headers())
assert resp.status_code == exp_status, f'Expected {resp.status_code} to be {exp_status}. Response:\n{json.dumps(resp, indent=2)}'
return resp.json()
def graphql_q(self, query, variables={}, exp_status = 200):
q = {'query': query}
if variables:
q['variables'] = variables
resp = requests.post(self.url + '/v1/graphql', json.dumps(q), headers=self.admin_auth_headers())
assert resp.status_code == exp_status, (resp.status_code, resp.json())
assert 'errors' not in resp.json(), resp.json()
return resp.json()
def track_all_tables_in_schema(self, schema='public'):
print("Track all tables in schema ", schema)
all_tables = self.pg.get_all_tables_in_a_schema(schema)
all_tables = [ {'schema': schema, 'name': t}
for t in all_tables ]
return self.track_tables(all_tables)
def run_bulk(self, queries, exp_status = 200):
bulk_q = {
'type': 'bulk',
'args': queries
}
return self.v1q(bulk_q, exp_status)
def select_simple(self, table, columns):
query = {
'type': 'select',
'args': {
'table': table,
'columns': columns
}
}
return self.v1q(query)
def get_all_tracked_tables(self):
table = {
'schema': 'hdb_catalog',
'name': 'hdb_table'
}
columns = ['table_schema', 'table_name']
resp = self.select_simple(table, columns)
tables = []
for row in resp:
tables.append({
'schema': row['table_schema'],
'name': row['table_name']
})
return tables
def track_tables(self, tables, exp_status=200):
queries = []
for table in tables:
q = {
'type' : 'track_table',
'args' : table
}
queries.append(q)
return self.run_bulk(queries, exp_status)
def track_table(self, table, exp_status=200):
q = self.mk_track_table_q(table)
return self.v1q(q, exp_status)
def mk_track_table_q(self, table):
return {
'type' : 'track_table',
'args' : table
}
def add_remote_schema(self, name, remote_url, headers={}, client_hdrs=False):
def hdr_name_val_pair(headers):
nvp = []
for (k,v) in headers.items():
nvp.append({'name': k, 'value': v})
return nvp
if len(headers) > 0:
client_hdrs = True
q = {
'type' : 'add_remote_schema',
'args': {
'name': name,
'comment': name,
'definition': {
'url': remote_url,
'headers': hdr_name_val_pair(headers),
'forward_client_headers': client_hdrs
}
}
}
return self.v1q(q)
def create_remote_obj_rel_to_itself(self, tables_schema, remote, remote_tables_schema):
print("Creating remote relationship to the tables in schema {} to itself using remote {}".format(tables_schema, remote))
fk_constrnts = self.pg.get_all_fk_constraints(tables_schema)
for (s, _, t, c, _, ft, _) in fk_constrnts:
table_cols = self.pg.get_all_columns_of_a_table(t, s)
if not 'id' in table_cols:
continue
rel_name = 'remote_' + inflection.singularize(t) + '_via_' + c
query ={
'type': 'create_remote_relationship',
'args' : {
'name' : rel_name,
'table' : {
'schema': s,
'name': t
},
'remote_schema': remote,
'hasura_fields': ['id', c],
'remote_field': {
remote_tables_schema + '_' + ft + '_by_pk' : {
'arguments': {
'id': '$' + c
},
'field': {
inflection.pluralize(t) + '_by_' + c: {
'arguments' : {
'where': {
c : {
'_eq': '$id'
}
}
}
}
}
}
}
}
}
print(query)
self.v1q(query)
def create_remote_obj_fk_ish_relationships(self, tables_schema, remote, remote_tables_schema):
print("Creating object foreign key ish relationships for tables in schema {} using remote {}".format(tables_schema, remote))
fk_constrnts = self.pg.get_all_fk_constraints(tables_schema)
for (s, _, t, c, _, ft, _) in fk_constrnts:
rel_name = inflection.singularize(ft)
if c.endswith('_id'):
rel_name = c[:-3]
rel_name = 'remote_' + rel_name
query ={
'type': 'create_remote_relationship',
'args' : {
'name' : rel_name,
'table' : {
'schema': s,
'name': t
},
'remote_schema': remote,
'hasura_fields': [c],
'remote_field': {
remote_tables_schema + '_' + ft + '_by_pk' : {
'arguments' : {
'id': '$' + c
}
}
}
}
}
print(query)
self.v1q(query)
def create_obj_fk_relationships(self, schema='public'):
print("Creating object foreign key relationships for tables in schema ", schema)
fk_constrnts = self.pg.get_all_fk_constraints(schema)
queries = []
for (s, _, t, c, _, ft, _) in fk_constrnts:
rel_name = inflection.singularize(ft)
if c.endswith('_id'):
rel_name = c[:-3]
table_cols = self.pg.get_all_columns_of_a_table(t, s)
if rel_name in table_cols:
rel_name += '_' + inflection.singularize(ft)
queries.append({
'type' : 'create_object_relationship',
'args': {
'table': {
'schema': s,
'name': t
},
'name': rel_name,
'using': {
'foreign_key_constraint_on': c
}
}
})
self.obj_fk_rels.add(((s,t),rel_name))
return self.run_bulk(queries)
def create_remote_arr_fk_ish_relationships(self, tables_schema, remote, remote_tables_schema):
fk_constrnts = self.pg.get_all_fk_constraints(tables_schema)
for (_, _, t, c, fs, ft, _) in fk_constrnts:
rel_name = 'remote_' + inflection.pluralize(t) + '_by_' + c
query ={
'type': 'create_remote_relationship',
'args' : {
'name' : rel_name,
'table' : {
'schema': fs,
'name': ft
},
'remote_schema': remote,
'hasura_fields': ['id'],
'remote_field': {
remote_tables_schema + '_' + t : {
'arguments' : {
'where': {
c : {
'_eq': '$id'
}
}
}
}
}
}
}
print(query)
self.v1q(query)
def create_arr_fk_relationships(self, schema='public'):
print("Creating array foreign key relationships for tables in schema ", schema)
fk_constrnts = self.pg.get_all_fk_constraints(schema)
queries = []
for (s, _, t, c, fs, ft, _) in fk_constrnts:
rel_name = inflection.pluralize(t) + '_by_' + c
queries.append({
'type' : 'create_array_relationship',
'args': {
'table': {
'schema': fs,
'name': ft
},
'name': rel_name,
'using': {
'foreign_key_constraint_on': {
'table': {
'schema': s,
'name': t
},
'column': c
}
}
}
})
self.arr_fk_rels.add(((fs,ft),rel_name))
return self.run_bulk(queries)
def run_sql(self, sql):
"""Run given SQL query"""
def mk_run_sql_q(sql):
return {
'type' : 'run_sql',
'args': {
'sql' : sql
}
}
return self.v1q(mk_run_sql_q(sql))