graphql-engine/server/tests-py/test_jwt.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

607 lines
25 KiB
Python
Raw Normal View History

from datetime import datetime, timedelta, timezone
import math
import json
import time
import ruamel.yaml as yaml
import pytest
import jwt
from test_subscriptions import init_ws_conn
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
from validate import check_query, mk_claims_with_namespace_path
from context import PytestConf
if not PytestConf.config.getoption('--hge-jwt-key-file'):
pytest.skip('--hge-jwt-key-file is missing, skipping JWT tests', allow_module_level=True)
if not PytestConf.config.getoption('--hge-jwt-conf'):
pytest.skip('--hge-jwt-key-conf is missing, skipping JWT tests', allow_module_level=True)
def get_claims_fmt(raw_conf):
conf = json.loads(raw_conf)
try:
claims_fmt = conf['claims_format']
except KeyError:
claims_fmt = 'json'
return claims_fmt
def mk_claims(conf, claims):
claims_fmt = get_claims_fmt(conf)
if claims_fmt == 'json':
return claims
elif claims_fmt == 'stringified_json':
return json.dumps(claims)
else:
return claims
def get_header_fmt(raw_conf):
conf = json.loads(raw_conf)
try:
hdr_fmt = conf['header']['type']
if hdr_fmt == 'Authorization':
return (hdr_fmt, None)
elif hdr_fmt == 'Cookie':
return (hdr_fmt, conf['header']['name'])
else:
raise Exception('Invalid JWT header format: %s' % conf)
except KeyError:
print('header conf not found in JWT conf, defaulting to Authorization')
hdr_fmt = ('Authorization', None)
return hdr_fmt
def mk_authz_header(conf, token):
(header, name) = get_header_fmt(conf)
if header == 'Authorization':
return {'Authorization': 'Bearer ' + token}
elif header == 'Cookie':
return {'Cookie': name + '=' + token}
else:
raise Exception('Invalid JWT header format')
@pytest.mark.parametrize('endpoint', ['/v1/graphql', '/v1alpha1/graphql'])
class TestJWTExpirySkew():
def test_jwt_expiry_leeway(self, hge_ctx, endpoint):
hasura_claims = mk_claims(hge_ctx.hge_jwt_conf, {
'x-hasura-user-id': '1',
'x-hasura-default-role': 'user',
'x-hasura-allowed-roles': ['user'],
})
claims_namespace_path = None
if 'claims_namespace_path' in hge_ctx.hge_jwt_conf_dict:
claims_namespace_path = hge_ctx.hge_jwt_conf_dict['claims_namespace_path']
if not 'allowed_skew' in hge_ctx.hge_jwt_conf_dict:
pytest.skip("This test expects 'allowed_skew' to be set in the JWT config" )
self.claims = mk_claims_with_namespace_path(self.claims,hasura_claims,claims_namespace_path)
exp = datetime.now(timezone.utc) - timedelta(seconds = 30)
self.claims['exp'] = round(exp.timestamp())
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm=hge_ctx.hge_jwt_algo)
self.conf['headers']['Authorization'] = 'Bearer ' + token
self.conf['response'] = {
'data': {
'article': [{
'id': 1,
'title': 'Article 1',
'content': 'Sample article content 1',
'is_published': False,
'author': {
'id': 1,
'name': 'Author 1'
}
}]
}
}
self.conf['url'] = endpoint
self.conf['status'] = 200
print ("conf is ", self.conf)
check_query(hge_ctx, self.conf, add_auth=False,claims_namespace_path=claims_namespace_path)
@pytest.fixture(autouse=True)
def transact(self, setup):
self.dir = 'queries/graphql_query/permissions'
with open(self.dir + '/user_select_query_unpublished_articles.yaml') as c:
self.conf = yaml.safe_load(c)
curr_time = datetime.now()
exp_time = curr_time + timedelta(hours=10)
self.claims = {
'sub': '1234567890',
'name': 'John Doe',
'iat': math.floor(curr_time.timestamp()),
'exp': math.floor(exp_time.timestamp())
}
@pytest.fixture(scope='class')
def setup(self, request, hge_ctx):
self.dir = 'queries/graphql_query/permissions'
st_code, resp = hge_ctx.v1q_f(self.dir + '/setup.yaml')
assert st_code == 200, resp
yield
st_code, resp = hge_ctx.v1q_f(self.dir + '/teardown.yaml')
assert st_code == 200, resp
@pytest.mark.parametrize('endpoint', ['/v1/graphql', '/v1alpha1/graphql'])
class TestJWTBasic():
def test_jwt_valid_claims_success(self, hge_ctx, endpoint):
hasura_claims = mk_claims(hge_ctx.hge_jwt_conf, {
'x-hasura-user-id': '1',
'x-hasura-allowed-roles': ['user', 'editor'],
'x-hasura-default-role': 'user'
})
claims_namespace_path = None
if 'claims_namespace_path' in hge_ctx.hge_jwt_conf_dict:
claims_namespace_path = hge_ctx.hge_jwt_conf_dict['claims_namespace_path']
self.claims = mk_claims_with_namespace_path(self.claims,hasura_claims,claims_namespace_path)
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm=hge_ctx.hge_jwt_algo)
authz_header = mk_authz_header(hge_ctx.hge_jwt_conf, token)
self.conf['headers'].update(authz_header)
self.conf['url'] = endpoint
self.conf['status'] = 200
check_query(hge_ctx, self.conf, add_auth=False,claims_namespace_path=claims_namespace_path)
def test_jwt_invalid_role_in_request_header(self, hge_ctx, endpoint):
hasura_claims = mk_claims(hge_ctx.hge_jwt_conf, {
'x-hasura-user-id': '1',
'x-hasura-allowed-roles': ['contractor', 'editor'],
'x-hasura-default-role': 'contractor'
})
claims_namespace_path = None
if 'claims_namespace_path' in hge_ctx.hge_jwt_conf_dict:
claims_namespace_path = hge_ctx.hge_jwt_conf_dict['claims_namespace_path']
self.claims = mk_claims_with_namespace_path(self.claims,hasura_claims,claims_namespace_path)
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm=hge_ctx.hge_jwt_algo)
authz_header = mk_authz_header(hge_ctx.hge_jwt_conf, token)
self.conf['headers'].update(authz_header)
self.conf['response'] = {
'errors': [{
'extensions': {
'code': 'access-denied',
'path': '$'
},
'message': 'Your requested role is not in allowed roles'
}]
}
self.conf['url'] = endpoint
if endpoint == '/v1/graphql':
self.conf['status'] = 200
if endpoint == '/v1alpha1/graphql':
self.conf['status'] = 400
check_query(hge_ctx, self.conf, add_auth=False,claims_namespace_path=claims_namespace_path)
def test_jwt_no_allowed_roles_in_claim(self, hge_ctx, endpoint):
hasura_claims = mk_claims(hge_ctx.hge_jwt_conf, {
'x-hasura-user-id': '1',
'x-hasura-default-role': 'user'
})
claims_namespace_path = None
if 'claims_namespace_path' in hge_ctx.hge_jwt_conf_dict:
claims_namespace_path = hge_ctx.hge_jwt_conf_dict['claims_namespace_path']
self.claims = mk_claims_with_namespace_path(self.claims,hasura_claims,claims_namespace_path)
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm=hge_ctx.hge_jwt_algo)
authz_header = mk_authz_header(hge_ctx.hge_jwt_conf, token)
self.conf['headers'].update(authz_header)
self.conf['response'] = {
'errors': [{
'extensions': {
'code': 'jwt-missing-role-claims',
'path': '$'
},
'message': 'JWT claim does not contain x-hasura-allowed-roles'
}]
}
self.conf['url'] = endpoint
if endpoint == '/v1/graphql':
self.conf['status'] = 200
if endpoint == '/v1alpha1/graphql':
self.conf['status'] = 400
check_query(hge_ctx, self.conf, add_auth=False,claims_namespace_path=claims_namespace_path)
def test_jwt_invalid_allowed_roles_in_claim(self, hge_ctx, endpoint):
hasura_claims = mk_claims(hge_ctx.hge_jwt_conf, {
'x-hasura-user-id': '1',
'x-hasura-allowed-roles': 'user',
'x-hasura-default-role': 'user'
})
claims_namespace_path = None
if 'claims_namespace_path' in hge_ctx.hge_jwt_conf_dict:
claims_namespace_path = hge_ctx.hge_jwt_conf_dict['claims_namespace_path']
self.claims = mk_claims_with_namespace_path(self.claims,hasura_claims,claims_namespace_path)
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm=hge_ctx.hge_jwt_algo)
authz_header = mk_authz_header(hge_ctx.hge_jwt_conf, token)
self.conf['headers'].update(authz_header)
self.conf['response'] = {
'errors': [{
'extensions': {
'code': 'jwt-invalid-claims',
'path': '$'
},
'message': 'invalid x-hasura-allowed-roles; should be a list of roles: parsing [] failed, expected Array, but encountered String'
}]
}
self.conf['url'] = endpoint
if endpoint == '/v1/graphql':
self.conf['status'] = 200
if endpoint == '/v1alpha1/graphql':
self.conf['status'] = 400
check_query(hge_ctx, self.conf, add_auth=False,claims_namespace_path=claims_namespace_path)
def test_jwt_no_default_role(self, hge_ctx, endpoint):
hasura_claims = mk_claims(hge_ctx.hge_jwt_conf, {
'x-hasura-user-id': '1',
'x-hasura-allowed-roles': ['user'],
})
claims_namespace_path = None
if 'claims_namespace_path' in hge_ctx.hge_jwt_conf_dict:
claims_namespace_path = hge_ctx.hge_jwt_conf_dict['claims_namespace_path']
self.claims = mk_claims_with_namespace_path(self.claims,hasura_claims,claims_namespace_path)
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm=hge_ctx.hge_jwt_algo)
authz_header = mk_authz_header(hge_ctx.hge_jwt_conf, token)
self.conf['headers'].update(authz_header)
self.conf['response'] = {
'errors': [{
'extensions': {
'code': 'jwt-missing-role-claims',
'path': '$'
},
'message': 'JWT claim does not contain x-hasura-default-role'
}]
}
self.conf['url'] = endpoint
if endpoint == '/v1/graphql':
self.conf['status'] = 200
if endpoint == '/v1alpha1/graphql':
self.conf['status'] = 400
check_query(hge_ctx, self.conf, add_auth=False,claims_namespace_path=claims_namespace_path)
def test_jwt_expired(self, hge_ctx, endpoint):
hasura_claims = mk_claims(hge_ctx.hge_jwt_conf, {
'x-hasura-user-id': '1',
'x-hasura-default-role': 'user',
'x-hasura-allowed-roles': ['user'],
})
claims_namespace_path = None
if 'claims_namespace_path' in hge_ctx.hge_jwt_conf_dict:
claims_namespace_path = hge_ctx.hge_jwt_conf_dict['claims_namespace_path']
self.claims = mk_claims_with_namespace_path(self.claims,hasura_claims,claims_namespace_path)
exp = datetime.now() - timedelta(minutes=1)
self.claims['exp'] = round(exp.timestamp())
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm=hge_ctx.hge_jwt_algo)
authz_header = mk_authz_header(hge_ctx.hge_jwt_conf, token)
self.conf['headers'].update(authz_header)
self.conf['response'] = {
'errors': [{
'extensions': {
'code': 'invalid-jwt',
'path': '$'
},
'message': 'Could not verify JWT: JWTExpired'
}]
}
self.conf['url'] = endpoint
if endpoint == '/v1/graphql':
self.conf['status'] = 200
if endpoint == '/v1alpha1/graphql':
self.conf['status'] = 400
check_query(hge_ctx, self.conf, add_auth=False,claims_namespace_path=claims_namespace_path)
def test_jwt_invalid_signature(self, hge_ctx, endpoint):
hasura_claims = mk_claims(hge_ctx.hge_jwt_conf, {
'x-hasura-user-id': '1',
'x-hasura-default-role': 'user',
'x-hasura-allowed-roles': ['user'],
})
claims_namespace_path = None
if 'claims_namespace_path' in hge_ctx.hge_jwt_conf_dict:
claims_namespace_path = hge_ctx.hge_jwt_conf_dict['claims_namespace_path']
self.claims = mk_claims_with_namespace_path(self.claims,hasura_claims,claims_namespace_path)
wrong_key = gen_rsa_key()
other_algo = 'HS256'
if hge_ctx.hge_jwt_algo == 'HS256':
other_algo = 'HS384'
token = jwt.encode(self.claims, wrong_key, algorithm=other_algo)
authz_header = mk_authz_header(hge_ctx.hge_jwt_conf, token)
self.conf['headers'].update(authz_header)
self.conf['response'] = {
'errors': [{
'extensions': {
'code': 'invalid-jwt',
'path': '$'
},
'message': 'Could not verify JWT: JWSError JWSInvalidSignature'
}]
}
self.conf['url'] = endpoint
if endpoint == '/v1/graphql':
self.conf['status'] = 200
if endpoint == '/v1alpha1/graphql':
self.conf['status'] = 400
check_query(hge_ctx, self.conf, add_auth=False,claims_namespace_path=claims_namespace_path)
def test_jwt_no_audience_in_conf(self, hge_ctx, endpoint):
2019-07-11 13:55:34 +03:00
jwt_conf = json.loads(hge_ctx.hge_jwt_conf)
if 'audience' in jwt_conf:
pytest.skip('audience present in conf, skipping testing no audience')
hasura_claims = mk_claims(hge_ctx.hge_jwt_conf, {
'x-hasura-user-id': '1',
'x-hasura-default-role': 'user',
'x-hasura-allowed-roles': ['user'],
})
claims_namespace_path = None
if 'claims_namespace_path' in hge_ctx.hge_jwt_conf_dict:
claims_namespace_path = hge_ctx.hge_jwt_conf_dict['claims_namespace_path']
self.claims = mk_claims_with_namespace_path(self.claims,hasura_claims,claims_namespace_path)
self.claims['aud'] = 'hasura-test-suite'
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm=hge_ctx.hge_jwt_algo)
authz_header = mk_authz_header(hge_ctx.hge_jwt_conf, token)
self.conf['headers'].update(authz_header)
self.conf['url'] = endpoint
check_query(hge_ctx, self.conf, add_auth=False,claims_namespace_path=claims_namespace_path)
def test_jwt_no_issuer_in_conf(self, hge_ctx, endpoint):
2019-07-11 13:55:34 +03:00
jwt_conf = json.loads(hge_ctx.hge_jwt_conf)
if 'issuer' in jwt_conf:
pytest.skip('issuer present in conf, skipping testing no issuer')
hasura_claims = mk_claims(hge_ctx.hge_jwt_conf, {
'x-hasura-user-id': '1',
'x-hasura-default-role': 'user',
'x-hasura-allowed-roles': ['user'],
})
claims_namespace_path = None
if 'claims_namespace_path' in hge_ctx.hge_jwt_conf_dict:
claims_namespace_path = hge_ctx.hge_jwt_conf_dict['claims_namespace_path']
self.claims = mk_claims_with_namespace_path(self.claims,hasura_claims,claims_namespace_path)
self.claims['iss'] = 'rubbish-issuer'
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm=hge_ctx.hge_jwt_algo)
authz_header = mk_authz_header(hge_ctx.hge_jwt_conf, token)
self.conf['headers'].update(authz_header)
self.conf['url'] = endpoint
check_query(hge_ctx, self.conf, add_auth=False,claims_namespace_path=claims_namespace_path)
@pytest.fixture(autouse=True)
run graphql tests on both http and websocket; add parallelism (close #1868) (#1921) Examples 1) ` pytest --hge-urls "http://127.0.0.1:8080" --pg-urls "postgresql://admin@127.0.0.1:5432/hge_tests" -vv ` 2) `pytest --hge-urls "http://127.0.0.1:8080" "http://127.0.0.1:8081" --pg-urls "postgresql://admin@127.0.0.1:5432/hge_tests" "postgresql://admin@127.0.0.1:5432/hge_tests2" -vv ` ### Solution and Design <!-- How is this issue solved/fixed? What is the design? --> <!-- It's better if we elaborate --> #### Reducing execution time of tests - The Schema setup and teardown, which were earlier done per test method, usually takes around 1 sec. - For mutations, the model has now been changed to only do schema setup and teardown once per test class. - A data setup and teardown will be done once per test instead (usually takes ~10ms). - For the test class to get this behaviour, one can can extend the class `DefaultTestMutations`. - The function `dir()` should be define which returns the location of the configuration folder. - Inside the configuration folder, there should be - Files `<conf_dir>/schema_setup.yaml` and `<conf_dir>/schema_teardown.yaml`, which has the metadata query executed during schema setup and teardown respectively - Files named `<conf_dir>/values_setup.yaml` and `<conf_dir>/values_teardown.yaml`. These files are executed to setup and remove data from the tables respectively. #### Running Graphql queries on both http and websockets - Each GraphQL query/mutation is run on the both HTTP and websocket protocols - Pytests test parameterisation is used to achieve this - The errors over websockets are slightly different from that on HTTP - The code takes care of converting the errors in HTTP to errors in websockets #### Parallel executation of tests. - The plugin pytest-xdist helps in running tests on parallel workers. - We are using this plugin to group tests by file and run on different workers. - Parallel test worker processes operate on separate postgres databases(and separate graphql-engines connected to these databases). Thus tests on one worker will not affect the tests on the other worker. - With two workers, this decreases execution times by half, as the tests on event triggers usually takes a long time, but does not consume much CPU.
2019-04-08 10:22:38 +03:00
def transact(self, setup):
self.dir = 'queries/graphql_query/permissions'
with open(self.dir + '/user_select_query_unpublished_articles.yaml') as c:
self.conf = yaml.safe_load(c)
curr_time = datetime.now()
exp_time = curr_time + timedelta(hours=10)
self.claims = {
'sub': '1234567890',
'name': 'John Doe',
'iat': math.floor(curr_time.timestamp()),
'exp': math.floor(exp_time.timestamp())
}
run graphql tests on both http and websocket; add parallelism (close #1868) (#1921) Examples 1) ` pytest --hge-urls "http://127.0.0.1:8080" --pg-urls "postgresql://admin@127.0.0.1:5432/hge_tests" -vv ` 2) `pytest --hge-urls "http://127.0.0.1:8080" "http://127.0.0.1:8081" --pg-urls "postgresql://admin@127.0.0.1:5432/hge_tests" "postgresql://admin@127.0.0.1:5432/hge_tests2" -vv ` ### Solution and Design <!-- How is this issue solved/fixed? What is the design? --> <!-- It's better if we elaborate --> #### Reducing execution time of tests - The Schema setup and teardown, which were earlier done per test method, usually takes around 1 sec. - For mutations, the model has now been changed to only do schema setup and teardown once per test class. - A data setup and teardown will be done once per test instead (usually takes ~10ms). - For the test class to get this behaviour, one can can extend the class `DefaultTestMutations`. - The function `dir()` should be define which returns the location of the configuration folder. - Inside the configuration folder, there should be - Files `<conf_dir>/schema_setup.yaml` and `<conf_dir>/schema_teardown.yaml`, which has the metadata query executed during schema setup and teardown respectively - Files named `<conf_dir>/values_setup.yaml` and `<conf_dir>/values_teardown.yaml`. These files are executed to setup and remove data from the tables respectively. #### Running Graphql queries on both http and websockets - Each GraphQL query/mutation is run on the both HTTP and websocket protocols - Pytests test parameterisation is used to achieve this - The errors over websockets are slightly different from that on HTTP - The code takes care of converting the errors in HTTP to errors in websockets #### Parallel executation of tests. - The plugin pytest-xdist helps in running tests on parallel workers. - We are using this plugin to group tests by file and run on different workers. - Parallel test worker processes operate on separate postgres databases(and separate graphql-engines connected to these databases). Thus tests on one worker will not affect the tests on the other worker. - With two workers, this decreases execution times by half, as the tests on event triggers usually takes a long time, but does not consume much CPU.
2019-04-08 10:22:38 +03:00
@pytest.fixture(scope='class')
def setup(self, request, hge_ctx):
self.dir = 'queries/graphql_query/permissions'
st_code, resp = hge_ctx.v1q_f(self.dir + '/setup.yaml')
assert st_code == 200, resp
yield
st_code, resp = hge_ctx.v1q_f(self.dir + '/teardown.yaml')
assert st_code == 200, resp
def gen_rsa_key():
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
)
return pem
class TestSubscriptionJwtExpiry(object):
def test_jwt_expiry(self, hge_ctx, ws_client):
curr_time = datetime.now()
self.claims = {
'sub': '1234567890',
'name': 'John Doe',
'iat': math.floor(curr_time.timestamp())
}
hasura_claims = mk_claims(hge_ctx.hge_jwt_conf, {
'x-hasura-user-id': '1',
'x-hasura-default-role': 'user',
'x-hasura-allowed-roles': ['user'],
})
claims_namespace_path = None
if 'claims_namespace_path' in hge_ctx.hge_jwt_conf_dict:
claims_namespace_path = hge_ctx.hge_jwt_conf_dict['claims_namespace_path']
self.claims = mk_claims_with_namespace_path(self.claims,hasura_claims,claims_namespace_path)
exp = curr_time + timedelta(seconds=4)
self.claims['exp'] = round(exp.timestamp())
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm=hge_ctx.hge_jwt_algo)
authz_header = mk_authz_header(hge_ctx.hge_jwt_conf, token)
payload = dict()
payload['headers'] = authz_header
init_ws_conn(hge_ctx, ws_client, payload)
time.sleep(6)
assert ws_client.remote_closed == True, ws_client.remote_closed
@pytest.mark.parametrize('endpoint', ['/v1/graphql', '/v1alpha1/graphql'])
class TestJwtAudienceCheck():
def test_jwt_valid_audience(self, hge_ctx, endpoint):
jwt_conf = json.loads(hge_ctx.hge_jwt_conf)
if 'audience' not in jwt_conf:
pytest.skip('audience not present in conf, skipping testing audience')
audience = jwt_conf['audience']
audience = audience if isinstance(audience, str) else audience[0]
hasura_claims = mk_claims(hge_ctx.hge_jwt_conf, {
'x-hasura-user-id': '1',
'x-hasura-default-role': 'user',
'x-hasura-allowed-roles': ['user'],
})
claims_namespace_path = None
if 'claims_namespace_path' in hge_ctx.hge_jwt_conf_dict:
claims_namespace_path = hge_ctx.hge_jwt_conf_dict['claims_namespace_path']
self.claims = mk_claims_with_namespace_path(self.claims,hasura_claims,claims_namespace_path)
self.claims['aud'] = audience
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm=hge_ctx.hge_jwt_algo)
self.conf['headers']['Authorization'] = 'Bearer ' + token
check_query(hge_ctx, self.conf, add_auth=False,claims_namespace_path=claims_namespace_path)
def test_jwt_invalid_audience(self, hge_ctx, endpoint):
jwt_conf = json.loads(hge_ctx.hge_jwt_conf)
if 'audience' not in jwt_conf:
pytest.skip('audience not present in conf, skipping testing audience')
hasura_claims = mk_claims(hge_ctx.hge_jwt_conf, {
'x-hasura-user-id': '1',
'x-hasura-default-role': 'user',
'x-hasura-allowed-roles': ['user'],
})
claims_namespace_path = None
if 'claims_namespace_path' in hge_ctx.hge_jwt_conf_dict:
claims_namespace_path = hge_ctx.hge_jwt_conf_dict['claims_namespace_path']
self.claims = mk_claims_with_namespace_path(self.claims,hasura_claims,claims_namespace_path)
self.claims['aud'] = 'rubbish_audience'
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm=hge_ctx.hge_jwt_algo)
self.conf['headers']['Authorization'] = 'Bearer ' + token
self.conf['response'] = {
'errors': [{
'extensions': {
'code': 'invalid-jwt',
'path': '$'
},
'message': 'Could not verify JWT: JWTNotInAudience'
}]
}
self.conf['url'] = endpoint
if endpoint == '/v1/graphql':
self.conf['status'] = 200
if endpoint == '/v1alpha1/graphql':
self.conf['status'] = 400
check_query(hge_ctx, self.conf, add_auth=False,claims_namespace_path=claims_namespace_path)
@pytest.fixture(autouse=True)
def transact(self, setup):
self.dir = 'queries/graphql_query/permissions'
with open(self.dir + '/user_select_query_unpublished_articles.yaml') as c:
self.conf = yaml.safe_load(c)
curr_time = datetime.now()
exp_time = curr_time + timedelta(hours=1)
self.claims = {
'sub': '1234567890',
'name': 'John Doe',
'iat': math.floor(curr_time.timestamp()),
'exp': math.floor(exp_time.timestamp())
}
@pytest.fixture(scope='class')
def setup(self, request, hge_ctx):
self.dir = 'queries/graphql_query/permissions'
st_code, resp = hge_ctx.v1q_f(self.dir + '/setup.yaml')
assert st_code == 200, resp
yield
st_code, resp = hge_ctx.v1q_f(self.dir + '/teardown.yaml')
assert st_code == 200, resp
@pytest.mark.parametrize('endpoint', ['/v1/graphql', '/v1alpha1/graphql'])
class TestJwtIssuerCheck():
def test_jwt_valid_issuer(self, hge_ctx, endpoint):
jwt_conf = json.loads(hge_ctx.hge_jwt_conf)
if 'issuer' not in jwt_conf:
pytest.skip('issuer not present in conf, skipping testing issuer')
issuer = jwt_conf['issuer']
hasura_claims = mk_claims(hge_ctx.hge_jwt_conf, {
'x-hasura-user-id': '1',
'x-hasura-default-role': 'user',
'x-hasura-allowed-roles': ['user'],
})
claims_namespace_path = None
if 'claims_namespace_path' in hge_ctx.hge_jwt_conf_dict:
claims_namespace_path = hge_ctx.hge_jwt_conf_dict['claims_namespace_path']
self.claims = mk_claims_with_namespace_path(self.claims,hasura_claims,claims_namespace_path)
self.claims['iss'] = issuer
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm=hge_ctx.hge_jwt_algo)
self.conf['headers']['Authorization'] = 'Bearer ' + token
check_query(hge_ctx, self.conf, add_auth=False,claims_namespace_path=claims_namespace_path)
def test_jwt_invalid_issuer(self, hge_ctx, endpoint):
jwt_conf = json.loads(hge_ctx.hge_jwt_conf)
if 'issuer' not in jwt_conf:
pytest.skip('issuer not present in conf, skipping testing issuer')
hasura_claims = mk_claims(hge_ctx.hge_jwt_conf, {
'x-hasura-user-id': '1',
'x-hasura-default-role': 'user',
'x-hasura-allowed-roles': ['user'],
})
claims_namespace_path = None
if 'claims_namespace_path' in hge_ctx.hge_jwt_conf_dict:
claims_namespace_path = hge_ctx.hge_jwt_conf_dict['claims_namespace_path']
self.claims = mk_claims_with_namespace_path(self.claims,hasura_claims,claims_namespace_path)
self.claims['iss'] = 'rubbish_issuer'
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm=hge_ctx.hge_jwt_algo)
self.conf['headers']['Authorization'] = 'Bearer ' + token
self.conf['response'] = {
'errors': [{
'extensions': {
'code': 'invalid-jwt',
'path': '$'
},
'message': 'Could not verify JWT: JWTNotInIssuer'
}]
}
self.conf['url'] = endpoint
if endpoint == '/v1/graphql':
self.conf['status'] = 200
if endpoint == '/v1alpha1/graphql':
self.conf['status'] = 400
check_query(hge_ctx, self.conf, add_auth=False,claims_namespace_path=claims_namespace_path)
@pytest.fixture(autouse=True)
def transact(self, setup):
self.dir = 'queries/graphql_query/permissions'
with open(self.dir + '/user_select_query_unpublished_articles.yaml') as c:
self.conf = yaml.safe_load(c)
curr_time = datetime.now()
exp_time = curr_time + timedelta(hours=1)
self.claims = {
'sub': '1234567890',
'name': 'John Doe',
'iat': math.floor(curr_time.timestamp()),
'exp': math.floor(exp_time.timestamp())
}
@pytest.fixture(scope='class')
def setup(self, request, hge_ctx):
self.dir = 'queries/graphql_query/permissions'
st_code, resp = hge_ctx.v1q_f(self.dir + '/setup.yaml')
assert st_code == 200, resp
yield
st_code, resp = hge_ctx.v1q_f(self.dir + '/teardown.yaml')
assert st_code == 200, resp