mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-15 17:31:56 +03:00
91aee7fdeb
We add a new pytest flag `--accept` that will automatically write back yaml files with updated responses. This makes it much easier and less error-prone to update test cases when we expect output to change, or when authoring new tests. Second we make sure to test that we actually preserve the order of the selection set when returning results. This is a "SHOULD" part of the spec but seems pretty important and something that users will rely on. To support both of the above we use ruamel.yaml which preserves a certain amount of formatting and comments (so that --accept can work in a failry ergonomic way), as well as ordering (so that when we write yaml the order of keys has meaning that's preserved during parsing). Use ruamel.yaml everywhere for consistency (since both libraries have different quirks). Quirks of ruamel.yaml: - trailing whitespace in multiline strings in yaml files isn't written back out as we'd like: https://bitbucket.org/ruamel/yaml/issues/47/multiline-strings-being-changed-if-they - formatting is only sort of preserved; ruamel e.g. normalizes indentation. Normally the diff is pretty clean though, and you can always just check in portions of your test file after --accept fixup
382 lines
14 KiB
Python
382 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import pytest
|
|
import ruamel.yaml as yaml
|
|
import json
|
|
import copy
|
|
import graphql
|
|
import os
|
|
import base64
|
|
import json
|
|
import jsondiff
|
|
import jwt
|
|
import random
|
|
import time
|
|
import warnings
|
|
|
|
from context import GQLWsClient
|
|
|
|
def check_keys(keys, obj):
|
|
for k in keys:
|
|
assert k in obj, obj
|
|
|
|
|
|
def check_ev_payload_shape(ev_payload):
|
|
top_level_keys = ["created_at", "event", "id", "table", "trigger"]
|
|
check_keys(top_level_keys, ev_payload)
|
|
|
|
event_keys = ["data", "op"]
|
|
check_keys(event_keys, ev_payload['event'])
|
|
|
|
trigger_keys = ["name"]
|
|
check_keys(trigger_keys, ev_payload['trigger'])
|
|
|
|
|
|
def validate_event_payload(ev_payload, trig_name, table):
|
|
check_ev_payload_shape(ev_payload)
|
|
assert ev_payload['table'] == table, ev_payload
|
|
assert ev_payload['trigger']['name'] == trig_name, ev_payload
|
|
|
|
|
|
def validate_event_headers(ev_headers, headers):
|
|
for key, value in headers.items():
|
|
v = ev_headers.get(key)
|
|
assert v == value, (key, v)
|
|
|
|
|
|
def validate_event_webhook(ev_webhook_path, webhook_path):
|
|
assert ev_webhook_path == webhook_path
|
|
|
|
|
|
def check_event(hge_ctx, evts_webhook, trig_name, table, operation, exp_ev_data,
|
|
headers = {},
|
|
webhook_path = '/',
|
|
session_variables = {'x-hasura-role': 'admin'}
|
|
):
|
|
ev_full = evts_webhook.get_event(3)
|
|
validate_event_webhook(ev_full['path'], webhook_path)
|
|
validate_event_headers(ev_full['headers'], headers)
|
|
validate_event_payload(ev_full['body'], trig_name, table)
|
|
ev = ev_full['body']['event']
|
|
assert ev['op'] == operation, ev
|
|
assert ev['session_variables'] == session_variables, ev
|
|
assert ev['data'] == exp_ev_data, ev
|
|
|
|
|
|
def test_forbidden_when_admin_secret_reqd(hge_ctx, conf):
|
|
if conf['url'] == '/v1/graphql':
|
|
if conf['status'] == 404:
|
|
status = [404]
|
|
else:
|
|
status = [200]
|
|
else:
|
|
status = [401, 404]
|
|
|
|
headers = {}
|
|
if 'headers' in conf:
|
|
headers = conf['headers']
|
|
|
|
# Test without admin secret
|
|
code, resp = hge_ctx.anyq(conf['url'], conf['query'], headers)
|
|
#assert code in [401,404], "\n" + yaml.dump({
|
|
assert code in status, "\n" + yaml.dump({
|
|
"expected": "Should be access denied as admin secret is not provided",
|
|
"actual": {
|
|
"code": code,
|
|
"response": resp
|
|
}
|
|
})
|
|
|
|
# Test with random admin secret
|
|
headers['X-Hasura-Admin-Secret'] = base64.b64encode(os.urandom(30))
|
|
code, resp = hge_ctx.anyq(conf['url'], conf['query'], headers)
|
|
#assert code in [401,404], "\n" + yaml.dump({
|
|
assert code in status, "\n" + yaml.dump({
|
|
"expected": "Should be access denied as an incorrect admin secret is provided",
|
|
"actual": {
|
|
"code": code,
|
|
"response": resp
|
|
}
|
|
})
|
|
|
|
|
|
def test_forbidden_webhook(hge_ctx, conf):
|
|
if conf['url'] == '/v1/graphql':
|
|
if conf['status'] == 404:
|
|
status = [404]
|
|
else:
|
|
status = [200]
|
|
else:
|
|
status = [401, 404]
|
|
|
|
h = {'Authorization': 'Bearer ' + base64.b64encode(base64.b64encode(os.urandom(30))).decode('utf-8')}
|
|
code, resp = hge_ctx.anyq(conf['url'], conf['query'], h)
|
|
#assert code in [401,404], "\n" + yaml.dump({
|
|
assert code in status, "\n" + yaml.dump({
|
|
"expected": "Should be access denied as it is denied from webhook",
|
|
"actual": {
|
|
"code": code,
|
|
"response": resp
|
|
}
|
|
})
|
|
|
|
|
|
# Returns the response received and a bool indicating whether the test passed
|
|
# or not (this will always be True unless we are `--accepting`)
|
|
def check_query(hge_ctx, conf, transport='http', add_auth=True):
|
|
headers = {}
|
|
if 'headers' in conf:
|
|
headers = conf['headers']
|
|
|
|
# No headers in conf => Admin role
|
|
# Set the X-Hasura-Role header randomly
|
|
# If header is set, jwt/webhook auth will happen
|
|
# Otherwise admin-secret will be set
|
|
if len(headers) == 0 and random.choice([True, False]):
|
|
headers['X-Hasura-Role'] = 'admin'
|
|
|
|
if add_auth:
|
|
#Use the hasura role specified in the test case, and create a JWT token
|
|
if hge_ctx.hge_jwt_key is not None and len(headers) > 0 and 'X-Hasura-Role' in headers:
|
|
hClaims = dict()
|
|
hClaims['X-Hasura-Allowed-Roles'] = [headers['X-Hasura-Role']]
|
|
hClaims['X-Hasura-Default-Role'] = headers['X-Hasura-Role']
|
|
for key in headers:
|
|
if key != 'X-Hasura-Role':
|
|
hClaims[key] = headers[key]
|
|
claim = {
|
|
"sub": "foo",
|
|
"name": "bar",
|
|
"https://hasura.io/jwt/claims": hClaims
|
|
}
|
|
headers['Authorization'] = 'Bearer ' + jwt.encode(claim, hge_ctx.hge_jwt_key, algorithm='RS512').decode(
|
|
'UTF-8')
|
|
|
|
#Use the hasura role specified in the test case, and create an authorization token which will be verified by webhook
|
|
if hge_ctx.hge_webhook is not None and len(headers) > 0:
|
|
if not hge_ctx.webhook_insecure:
|
|
#Check whether the output is also forbidden when webhook returns forbidden
|
|
test_forbidden_webhook(hge_ctx, conf)
|
|
headers['X-Hasura-Auth-Mode'] = 'webhook'
|
|
headers_new = dict()
|
|
headers_new['Authorization'] = 'Bearer ' + base64.b64encode(json.dumps(headers).encode('utf-8')).decode(
|
|
'utf-8')
|
|
headers = headers_new
|
|
|
|
#The case as admin with admin-secret and jwt/webhook
|
|
elif (
|
|
hge_ctx.hge_webhook is not None or hge_ctx.hge_jwt_key is not None) and hge_ctx.hge_key is not None and len(
|
|
headers) == 0:
|
|
headers['X-Hasura-Admin-Secret'] = hge_ctx.hge_key
|
|
|
|
#The case as admin with only admin-secret
|
|
elif hge_ctx.hge_key is not None and hge_ctx.hge_webhook is None and hge_ctx.hge_jwt_key is None:
|
|
#Test whether it is forbidden when incorrect/no admin_secret is specified
|
|
test_forbidden_when_admin_secret_reqd(hge_ctx, conf)
|
|
headers['X-Hasura-Admin-Secret'] = hge_ctx.hge_key
|
|
|
|
assert transport in ['websocket', 'http'], "Unknown transport type " + transport
|
|
if transport == 'websocket':
|
|
assert 'response' in conf
|
|
assert conf['url'].endswith('/graphql')
|
|
print('running on websocket')
|
|
return validate_gql_ws_q(
|
|
hge_ctx,
|
|
conf['url'],
|
|
conf['query'],
|
|
headers,
|
|
conf['response'],
|
|
True
|
|
)
|
|
elif transport == 'http':
|
|
print('running on http')
|
|
return validate_http_anyq(hge_ctx, conf['url'], conf['query'], headers,
|
|
conf['status'], conf.get('response'))
|
|
|
|
|
|
|
|
def validate_gql_ws_q(hge_ctx, endpoint, query, headers, exp_http_response, retry=False):
|
|
if endpoint == '/v1alpha1/graphql':
|
|
ws_client = GQLWsClient(hge_ctx, '/v1alpha1/graphql')
|
|
else:
|
|
ws_client = hge_ctx.ws_client
|
|
print(ws_client.ws_url)
|
|
if not headers or len(headers) == 0:
|
|
ws_client.init({})
|
|
|
|
query_resp = ws_client.send_query(query, headers=headers, timeout=15)
|
|
resp = next(query_resp)
|
|
print('websocket resp: ', resp)
|
|
|
|
if resp.get('type') == 'complete':
|
|
if retry:
|
|
#Got query complete before payload. Retry once more
|
|
print("Got query complete before getting query response payload. Retrying")
|
|
ws_client.recreate_conn()
|
|
return validate_gql_ws_q(hge_ctx, query, headers, exp_http_response, False)
|
|
else:
|
|
assert resp['type'] in ['data', 'error'], resp
|
|
|
|
if 'errors' in exp_http_response or 'error' in exp_http_response:
|
|
assert resp['type'] in ['data', 'error'], resp
|
|
else:
|
|
assert resp['type'] == 'data', resp
|
|
|
|
assert 'payload' in resp, resp
|
|
resp_done = next(query_resp)
|
|
assert resp_done['type'] == 'complete'
|
|
|
|
return assert_graphql_resp_expected(resp['payload'], exp_http_response, query)
|
|
|
|
|
|
def validate_http_anyq(hge_ctx, url, query, headers, exp_code, exp_response):
|
|
code, resp = hge_ctx.anyq(url, query, headers)
|
|
print(headers)
|
|
assert code == exp_code, resp
|
|
print('http resp: ', resp)
|
|
if exp_response:
|
|
return assert_graphql_resp_expected(resp, exp_response, query)
|
|
else:
|
|
return resp, True
|
|
|
|
# Check the actual graphql response is what we expected, also taking into
|
|
# consideration the ordering of keys that we expect to be preserved, based on
|
|
# 'query'.
|
|
#
|
|
# Returns 'resp' and a bool indicating whether the test passed or not (this
|
|
# will always be True unless we are `--accepting`)
|
|
def assert_graphql_resp_expected(resp_orig, exp_response_orig, query):
|
|
# Prepare actual and respected responses so comparison takes into
|
|
# consideration only the ordering that we care about:
|
|
resp = collapse_order_not_selset(resp_orig, query)
|
|
exp_response = collapse_order_not_selset(exp_response_orig, query)
|
|
matched = resp == exp_response
|
|
|
|
if pytest.config.getoption("--accept"):
|
|
print('skipping assertion since we chose to --accept new output')
|
|
else:
|
|
assert matched, '\n' + yaml.dump({
|
|
# Keep strict received order when displaying errors:
|
|
'response': resp_orig,
|
|
'expected': exp_response_orig,
|
|
'diff':
|
|
(lambda diff:
|
|
"(results differ only in their order of keys)" if diff == {} else diff)
|
|
(stringify_keys(jsondiff.diff(exp_response, resp)))
|
|
}, Dumper=yaml.RoundTripDumper )
|
|
return resp, matched # matched always True unless --accept
|
|
|
|
|
|
def check_query_f(hge_ctx, f, transport='http', add_auth=True):
|
|
print("Test file: " + f)
|
|
hge_ctx.may_skip_test_teardown = False
|
|
print ("transport="+transport)
|
|
with open(f, 'r+') as c:
|
|
# For `--accept`:
|
|
should_write_back = False
|
|
|
|
# ruamel RoundTripLoader will preserve order so that we can test the
|
|
# JSON ordering property conforms to YAML spec.
|
|
# It also lets us write back the yaml nicely when we --accept.
|
|
conf = yaml.load(c, yaml.RoundTripLoader)
|
|
if isinstance(conf, list):
|
|
for ix, sconf in enumerate(conf):
|
|
actual_resp, matched = check_query(hge_ctx, sconf, transport, add_auth)
|
|
if pytest.config.getoption("--accept") and not matched:
|
|
conf[ix]['response'] = actual_resp
|
|
should_write_back = True
|
|
else:
|
|
if conf['status'] != 200:
|
|
hge_ctx.may_skip_test_teardown = True
|
|
actual_resp, matched = check_query(hge_ctx, conf, transport, add_auth)
|
|
# If using `--accept` write the file back out with the new expected
|
|
# response set to the actual response we got:
|
|
if pytest.config.getoption("--accept") and not matched:
|
|
conf['response'] = actual_resp
|
|
should_write_back = True
|
|
|
|
# TODO only write back when this test is not xfail. I'm stumped on how
|
|
# best to do this. Where the 'request' fixture comes into scope we can
|
|
# do : `request.node.get_closest_marker("xfail")` but don't want to
|
|
# require that everywhere...
|
|
if should_write_back:
|
|
warnings.warn(
|
|
"\nRecording formerly failing case as correct in: " + f +
|
|
"\n NOTE: if this case was marked 'xfail' this won't be correct!"
|
|
)
|
|
c.seek(0)
|
|
c.write(yaml.dump(conf, Dumper=yaml.RoundTripDumper))
|
|
c.truncate()
|
|
|
|
|
|
# Return a new dict that discards the object key ordering properties of
|
|
# 'result' where the key is not part of the selection set. This lets us compare
|
|
# expected and actual results properly with respect to the graphql spec's
|
|
# ordering requirements.
|
|
def collapse_order_not_selset(result_inp, query):
|
|
# Collapse to unordered dict recursively by roundtripping through json
|
|
def collapse(x):
|
|
return json.loads(json.dumps(x))
|
|
|
|
result = copy.deepcopy(result_inp)
|
|
try:
|
|
if 'query' in query:
|
|
gql_query_str = query['query']
|
|
# We don't support multiple operations in the same query yet:
|
|
selset0 = graphql.parse(gql_query_str).definitions[0].selection_set
|
|
def go(result_node, selset):
|
|
for field in selset.selections:
|
|
fname = field.name.value
|
|
|
|
# If field has no subfields then all its values can be recursively stripped of ordering.
|
|
# Also if it's an array for some reason (like in 'returning') TODO make this better
|
|
if field.selection_set is None or not isinstance(result_node[fname], (dict, list)):
|
|
result_node[fname] = collapse(result_node[fname])
|
|
elif isinstance(result_node[fname], list):
|
|
for node in result_node[fname]:
|
|
go(node, field.selection_set)
|
|
else:
|
|
go(result_node[fname], field.selection_set)
|
|
|
|
if 'data' in result:
|
|
go(result['data'], selset0)
|
|
# errors is unordered I guess
|
|
if 'errors' in result:
|
|
result['errors'] = collapse(result['errors'])
|
|
# and finally remove ordering at just the topmost level:
|
|
return dict(result)
|
|
else:
|
|
# this isn't a graphql query, collapse ordering, I guess:
|
|
return collapse(result_inp)
|
|
|
|
# Bail out here for any number of reasons. TODO improve me
|
|
except Exception as e:
|
|
print("Bailing out and collapsing all ordering, due to: ", e)
|
|
return collapse(result)
|
|
|
|
|
|
# Use this since jsondiff seems to produce object/dict structures that can't
|
|
# always be serialized to json.
|
|
# Copy-pasta from: https://stackoverflow.com/q/12734517/176841
|
|
def stringify_keys(d):
|
|
"""Convert a dict's keys to strings if they are not."""
|
|
for key in d.keys():
|
|
# check inner dict
|
|
if isinstance(d[key], dict):
|
|
value = stringify_keys(d[key])
|
|
else:
|
|
value = d[key]
|
|
# convert nonstring to string if needed
|
|
if not isinstance(key, str):
|
|
try:
|
|
d[key.decode("utf-8")] = value
|
|
except Exception:
|
|
try:
|
|
d[repr(key)] = value
|
|
except Exception:
|
|
raise
|
|
|
|
# delete old key
|
|
del d[key]
|
|
return d
|