mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-15 17:31:56 +03:00
3cb9bab9f1
When we run the HGE server inside the test harness, it needs to run with an admin secret for some tests to make sense. This tags each test that requires an admin secret with `pytest.mark.admin_secret`, which then generates a UUID and injects that into both the server and the test case (if required). It also simplifies the way the test harness picks up an existing admin secret, allowing it to use the environment variable instead of requiring it via a parameter. PR-URL: https://github.com/hasura/graphql-engine-mono/pull/6120 GitOrigin-RevId: 55c5b9e8c99bdad9c8304098444ddb9516749a2c
600 lines
25 KiB
Python
600 lines
25 KiB
Python
from datetime import datetime, timedelta, timezone
|
|
import math
|
|
import json
|
|
import time
|
|
|
|
import pytest
|
|
import jwt
|
|
from test_subscriptions import init_ws_conn
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
|
from cryptography.hazmat.primitives import serialization
|
|
from ruamel.yaml import YAML
|
|
|
|
from validate import check_query, mk_claims_with_namespace_path
|
|
from context import PytestConf
|
|
|
|
yaml=YAML(typ='safe', pure=True)
|
|
|
|
if not PytestConf.config.getoption('--hge-jwt-key-file'):
|
|
pytest.skip('--hge-jwt-key-file is missing, skipping JWT tests', allow_module_level=True)
|
|
|
|
if not PytestConf.config.getoption('--hge-jwt-conf'):
|
|
pytest.skip('--hge-jwt-key-conf is missing, skipping JWT tests', allow_module_level=True)
|
|
|
|
def get_claims_fmt(raw_conf):
|
|
conf = json.loads(raw_conf)
|
|
try:
|
|
claims_fmt = conf['claims_format']
|
|
except KeyError:
|
|
claims_fmt = 'json'
|
|
return claims_fmt
|
|
|
|
def mk_claims(conf, claims):
|
|
claims_fmt = get_claims_fmt(conf)
|
|
if claims_fmt == 'json':
|
|
return claims
|
|
elif claims_fmt == 'stringified_json':
|
|
return json.dumps(claims)
|
|
else:
|
|
return claims
|
|
|
|
def get_header_fmt(raw_conf):
|
|
conf = json.loads(raw_conf)
|
|
try:
|
|
hdr_fmt = conf['header']['type']
|
|
if hdr_fmt == 'Authorization':
|
|
return (hdr_fmt, None)
|
|
elif hdr_fmt == 'Cookie':
|
|
return (hdr_fmt, conf['header']['name'])
|
|
else:
|
|
raise Exception('Invalid JWT header format: %s' % conf)
|
|
except KeyError:
|
|
print('header conf not found in JWT conf, defaulting to Authorization')
|
|
hdr_fmt = ('Authorization', None)
|
|
return hdr_fmt
|
|
|
|
def mk_authz_header(conf, token):
|
|
(header, name) = get_header_fmt(conf)
|
|
if header == 'Authorization':
|
|
return {'Authorization': 'Bearer ' + token}
|
|
elif header == 'Cookie':
|
|
return {'Cookie': name + '=' + token}
|
|
else:
|
|
raise Exception('Invalid JWT header format')
|
|
|
|
|
|
@pytest.mark.parametrize('endpoint', ['/v1/graphql', '/v1alpha1/graphql'])
|
|
class TestJWTExpirySkew():
|
|
|
|
def test_jwt_expiry_leeway(self, hge_ctx, endpoint):
|
|
hasura_claims = mk_claims(hge_ctx.hge_jwt_conf, {
|
|
'x-hasura-user-id': '1',
|
|
'x-hasura-default-role': 'user',
|
|
'x-hasura-allowed-roles': ['user'],
|
|
})
|
|
claims_namespace_path = None
|
|
if 'claims_namespace_path' in hge_ctx.hge_jwt_conf_dict:
|
|
claims_namespace_path = hge_ctx.hge_jwt_conf_dict['claims_namespace_path']
|
|
|
|
if not 'allowed_skew' in hge_ctx.hge_jwt_conf_dict:
|
|
pytest.skip("This test expects 'allowed_skew' to be set in the JWT config" )
|
|
|
|
self.claims = mk_claims_with_namespace_path(self.claims,hasura_claims,claims_namespace_path)
|
|
exp = datetime.now(timezone.utc) - timedelta(seconds = 30)
|
|
self.claims['exp'] = round(exp.timestamp())
|
|
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm=hge_ctx.hge_jwt_algo)
|
|
self.conf['headers']['Authorization'] = 'Bearer ' + token
|
|
self.conf['response'] = {
|
|
'data': {
|
|
'article': [{
|
|
'id': 1,
|
|
'title': 'Article 1',
|
|
'content': 'Sample article content 1',
|
|
'is_published': False,
|
|
'author': {
|
|
'id': 1,
|
|
'name': 'Author 1'
|
|
}
|
|
}]
|
|
}
|
|
}
|
|
self.conf['url'] = endpoint
|
|
self.conf['status'] = 200
|
|
print ("conf is ", self.conf)
|
|
check_query(hge_ctx, self.conf, add_auth=False,claims_namespace_path=claims_namespace_path)
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def transact(self, setup):
|
|
self.dir = 'queries/graphql_query/permissions'
|
|
with open(self.dir + '/user_select_query_unpublished_articles.yaml') as c:
|
|
self.conf = yaml.load(c)
|
|
curr_time = datetime.now()
|
|
exp_time = curr_time + timedelta(hours=10)
|
|
self.claims = {
|
|
'sub': '1234567890',
|
|
'name': 'John Doe',
|
|
'iat': math.floor(curr_time.timestamp()),
|
|
'exp': math.floor(exp_time.timestamp())
|
|
}
|
|
|
|
@pytest.fixture(scope='class')
|
|
def setup(self, postgis, hge_ctx):
|
|
self.dir = 'queries/graphql_query/permissions'
|
|
hge_ctx.v1q_f(self.dir + '/setup.yaml')
|
|
yield
|
|
hge_ctx.v1q_f(self.dir + '/teardown.yaml')
|
|
|
|
@pytest.mark.parametrize('endpoint', ['/v1/graphql', '/v1alpha1/graphql'])
|
|
class TestJWTBasic():
|
|
|
|
def test_jwt_valid_claims_success(self, hge_ctx, endpoint):
|
|
hasura_claims = mk_claims(hge_ctx.hge_jwt_conf, {
|
|
'x-hasura-user-id': '1',
|
|
'x-hasura-allowed-roles': ['user', 'editor'],
|
|
'x-hasura-default-role': 'user'
|
|
})
|
|
claims_namespace_path = None
|
|
if 'claims_namespace_path' in hge_ctx.hge_jwt_conf_dict:
|
|
claims_namespace_path = hge_ctx.hge_jwt_conf_dict['claims_namespace_path']
|
|
self.claims = mk_claims_with_namespace_path(self.claims,hasura_claims,claims_namespace_path)
|
|
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm=hge_ctx.hge_jwt_algo)
|
|
authz_header = mk_authz_header(hge_ctx.hge_jwt_conf, token)
|
|
self.conf['headers'].update(authz_header)
|
|
self.conf['url'] = endpoint
|
|
self.conf['status'] = 200
|
|
check_query(hge_ctx, self.conf, add_auth=False,claims_namespace_path=claims_namespace_path)
|
|
|
|
def test_jwt_invalid_role_in_request_header(self, hge_ctx, endpoint):
|
|
hasura_claims = mk_claims(hge_ctx.hge_jwt_conf, {
|
|
'x-hasura-user-id': '1',
|
|
'x-hasura-allowed-roles': ['contractor', 'editor'],
|
|
'x-hasura-default-role': 'contractor'
|
|
})
|
|
claims_namespace_path = None
|
|
if 'claims_namespace_path' in hge_ctx.hge_jwt_conf_dict:
|
|
claims_namespace_path = hge_ctx.hge_jwt_conf_dict['claims_namespace_path']
|
|
|
|
self.claims = mk_claims_with_namespace_path(self.claims,hasura_claims,claims_namespace_path)
|
|
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm=hge_ctx.hge_jwt_algo)
|
|
authz_header = mk_authz_header(hge_ctx.hge_jwt_conf, token)
|
|
self.conf['headers'].update(authz_header)
|
|
self.conf['response'] = {
|
|
'errors': [{
|
|
'extensions': {
|
|
'code': 'access-denied',
|
|
'path': '$'
|
|
},
|
|
'message': 'Your requested role is not in allowed roles'
|
|
}]
|
|
}
|
|
self.conf['url'] = endpoint
|
|
if endpoint == '/v1/graphql':
|
|
self.conf['status'] = 200
|
|
if endpoint == '/v1alpha1/graphql':
|
|
self.conf['status'] = 400
|
|
check_query(hge_ctx, self.conf, add_auth=False,claims_namespace_path=claims_namespace_path)
|
|
|
|
def test_jwt_no_allowed_roles_in_claim(self, hge_ctx, endpoint):
|
|
hasura_claims = mk_claims(hge_ctx.hge_jwt_conf, {
|
|
'x-hasura-user-id': '1',
|
|
'x-hasura-default-role': 'user'
|
|
})
|
|
claims_namespace_path = None
|
|
if 'claims_namespace_path' in hge_ctx.hge_jwt_conf_dict:
|
|
claims_namespace_path = hge_ctx.hge_jwt_conf_dict['claims_namespace_path']
|
|
|
|
self.claims = mk_claims_with_namespace_path(self.claims,hasura_claims,claims_namespace_path)
|
|
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm=hge_ctx.hge_jwt_algo)
|
|
authz_header = mk_authz_header(hge_ctx.hge_jwt_conf, token)
|
|
self.conf['headers'].update(authz_header)
|
|
self.conf['response'] = {
|
|
'errors': [{
|
|
'extensions': {
|
|
'code': 'jwt-missing-role-claims',
|
|
'path': '$'
|
|
},
|
|
'message': 'JWT claim does not contain x-hasura-allowed-roles'
|
|
}]
|
|
}
|
|
self.conf['url'] = endpoint
|
|
if endpoint == '/v1/graphql':
|
|
self.conf['status'] = 200
|
|
if endpoint == '/v1alpha1/graphql':
|
|
self.conf['status'] = 400
|
|
check_query(hge_ctx, self.conf, add_auth=False,claims_namespace_path=claims_namespace_path)
|
|
|
|
def test_jwt_invalid_allowed_roles_in_claim(self, hge_ctx, endpoint):
|
|
hasura_claims = mk_claims(hge_ctx.hge_jwt_conf, {
|
|
'x-hasura-user-id': '1',
|
|
'x-hasura-allowed-roles': 'user',
|
|
'x-hasura-default-role': 'user'
|
|
})
|
|
claims_namespace_path = None
|
|
if 'claims_namespace_path' in hge_ctx.hge_jwt_conf_dict:
|
|
claims_namespace_path = hge_ctx.hge_jwt_conf_dict['claims_namespace_path']
|
|
|
|
self.claims = mk_claims_with_namespace_path(self.claims,hasura_claims,claims_namespace_path)
|
|
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm=hge_ctx.hge_jwt_algo)
|
|
authz_header = mk_authz_header(hge_ctx.hge_jwt_conf, token)
|
|
self.conf['headers'].update(authz_header)
|
|
self.conf['response'] = {
|
|
'errors': [{
|
|
'extensions': {
|
|
'code': 'jwt-invalid-claims',
|
|
'path': '$'
|
|
},
|
|
'message': 'invalid x-hasura-allowed-roles; should be a list of roles: parsing [] failed, expected Array, but encountered String'
|
|
}]
|
|
}
|
|
self.conf['url'] = endpoint
|
|
if endpoint == '/v1/graphql':
|
|
self.conf['status'] = 200
|
|
if endpoint == '/v1alpha1/graphql':
|
|
self.conf['status'] = 400
|
|
check_query(hge_ctx, self.conf, add_auth=False,claims_namespace_path=claims_namespace_path)
|
|
|
|
def test_jwt_no_default_role(self, hge_ctx, endpoint):
|
|
hasura_claims = mk_claims(hge_ctx.hge_jwt_conf, {
|
|
'x-hasura-user-id': '1',
|
|
'x-hasura-allowed-roles': ['user'],
|
|
})
|
|
claims_namespace_path = None
|
|
if 'claims_namespace_path' in hge_ctx.hge_jwt_conf_dict:
|
|
claims_namespace_path = hge_ctx.hge_jwt_conf_dict['claims_namespace_path']
|
|
|
|
self.claims = mk_claims_with_namespace_path(self.claims,hasura_claims,claims_namespace_path)
|
|
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm=hge_ctx.hge_jwt_algo)
|
|
authz_header = mk_authz_header(hge_ctx.hge_jwt_conf, token)
|
|
self.conf['headers'].update(authz_header)
|
|
self.conf['response'] = {
|
|
'errors': [{
|
|
'extensions': {
|
|
'code': 'jwt-missing-role-claims',
|
|
'path': '$'
|
|
},
|
|
'message': 'JWT claim does not contain x-hasura-default-role'
|
|
}]
|
|
}
|
|
self.conf['url'] = endpoint
|
|
if endpoint == '/v1/graphql':
|
|
self.conf['status'] = 200
|
|
if endpoint == '/v1alpha1/graphql':
|
|
self.conf['status'] = 400
|
|
check_query(hge_ctx, self.conf, add_auth=False,claims_namespace_path=claims_namespace_path)
|
|
|
|
def test_jwt_expired(self, hge_ctx, endpoint):
|
|
hasura_claims = mk_claims(hge_ctx.hge_jwt_conf, {
|
|
'x-hasura-user-id': '1',
|
|
'x-hasura-default-role': 'user',
|
|
'x-hasura-allowed-roles': ['user'],
|
|
})
|
|
claims_namespace_path = None
|
|
if 'claims_namespace_path' in hge_ctx.hge_jwt_conf_dict:
|
|
claims_namespace_path = hge_ctx.hge_jwt_conf_dict['claims_namespace_path']
|
|
|
|
self.claims = mk_claims_with_namespace_path(self.claims,hasura_claims,claims_namespace_path)
|
|
exp = datetime.now() - timedelta(minutes=1)
|
|
self.claims['exp'] = round(exp.timestamp())
|
|
|
|
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm=hge_ctx.hge_jwt_algo)
|
|
authz_header = mk_authz_header(hge_ctx.hge_jwt_conf, token)
|
|
self.conf['headers'].update(authz_header)
|
|
self.conf['response'] = {
|
|
'errors': [{
|
|
'extensions': {
|
|
'code': 'invalid-jwt',
|
|
'path': '$'
|
|
},
|
|
'message': 'Could not verify JWT: JWTExpired'
|
|
}]
|
|
}
|
|
self.conf['url'] = endpoint
|
|
if endpoint == '/v1/graphql':
|
|
self.conf['status'] = 200
|
|
if endpoint == '/v1alpha1/graphql':
|
|
self.conf['status'] = 400
|
|
check_query(hge_ctx, self.conf, add_auth=False,claims_namespace_path=claims_namespace_path)
|
|
|
|
def test_jwt_invalid_signature(self, hge_ctx, endpoint):
|
|
hasura_claims = mk_claims(hge_ctx.hge_jwt_conf, {
|
|
'x-hasura-user-id': '1',
|
|
'x-hasura-default-role': 'user',
|
|
'x-hasura-allowed-roles': ['user'],
|
|
})
|
|
claims_namespace_path = None
|
|
if 'claims_namespace_path' in hge_ctx.hge_jwt_conf_dict:
|
|
claims_namespace_path = hge_ctx.hge_jwt_conf_dict['claims_namespace_path']
|
|
|
|
self.claims = mk_claims_with_namespace_path(self.claims,hasura_claims,claims_namespace_path)
|
|
wrong_key = gen_rsa_key()
|
|
other_algo = 'HS256'
|
|
if hge_ctx.hge_jwt_algo == 'HS256':
|
|
other_algo = 'HS384'
|
|
token = jwt.encode(self.claims, wrong_key, algorithm=other_algo)
|
|
authz_header = mk_authz_header(hge_ctx.hge_jwt_conf, token)
|
|
self.conf['headers'].update(authz_header)
|
|
self.conf['response'] = {
|
|
'errors': [{
|
|
'extensions': {
|
|
'code': 'invalid-jwt',
|
|
'path': '$'
|
|
},
|
|
'message': 'Could not verify JWT: JWSError JWSInvalidSignature'
|
|
}]
|
|
}
|
|
self.conf['url'] = endpoint
|
|
if endpoint == '/v1/graphql':
|
|
self.conf['status'] = 200
|
|
if endpoint == '/v1alpha1/graphql':
|
|
self.conf['status'] = 400
|
|
check_query(hge_ctx, self.conf, add_auth=False,claims_namespace_path=claims_namespace_path)
|
|
|
|
def test_jwt_no_audience_in_conf(self, hge_ctx, endpoint):
|
|
jwt_conf = json.loads(hge_ctx.hge_jwt_conf)
|
|
if 'audience' in jwt_conf:
|
|
pytest.skip('audience present in conf, skipping testing no audience')
|
|
|
|
hasura_claims = mk_claims(hge_ctx.hge_jwt_conf, {
|
|
'x-hasura-user-id': '1',
|
|
'x-hasura-default-role': 'user',
|
|
'x-hasura-allowed-roles': ['user'],
|
|
})
|
|
claims_namespace_path = None
|
|
if 'claims_namespace_path' in hge_ctx.hge_jwt_conf_dict:
|
|
claims_namespace_path = hge_ctx.hge_jwt_conf_dict['claims_namespace_path']
|
|
self.claims = mk_claims_with_namespace_path(self.claims,hasura_claims,claims_namespace_path)
|
|
self.claims['aud'] = 'hasura-test-suite'
|
|
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm=hge_ctx.hge_jwt_algo)
|
|
authz_header = mk_authz_header(hge_ctx.hge_jwt_conf, token)
|
|
self.conf['headers'].update(authz_header)
|
|
self.conf['url'] = endpoint
|
|
check_query(hge_ctx, self.conf, add_auth=False,claims_namespace_path=claims_namespace_path)
|
|
|
|
def test_jwt_no_issuer_in_conf(self, hge_ctx, endpoint):
|
|
jwt_conf = json.loads(hge_ctx.hge_jwt_conf)
|
|
if 'issuer' in jwt_conf:
|
|
pytest.skip('issuer present in conf, skipping testing no issuer')
|
|
|
|
hasura_claims = mk_claims(hge_ctx.hge_jwt_conf, {
|
|
'x-hasura-user-id': '1',
|
|
'x-hasura-default-role': 'user',
|
|
'x-hasura-allowed-roles': ['user'],
|
|
})
|
|
claims_namespace_path = None
|
|
if 'claims_namespace_path' in hge_ctx.hge_jwt_conf_dict:
|
|
claims_namespace_path = hge_ctx.hge_jwt_conf_dict['claims_namespace_path']
|
|
|
|
self.claims = mk_claims_with_namespace_path(self.claims,hasura_claims,claims_namespace_path)
|
|
self.claims['iss'] = 'rubbish-issuer'
|
|
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm=hge_ctx.hge_jwt_algo)
|
|
authz_header = mk_authz_header(hge_ctx.hge_jwt_conf, token)
|
|
self.conf['headers'].update(authz_header)
|
|
self.conf['url'] = endpoint
|
|
check_query(hge_ctx, self.conf, add_auth=False,claims_namespace_path=claims_namespace_path)
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def transact(self, setup):
|
|
self.dir = 'queries/graphql_query/permissions'
|
|
with open(self.dir + '/user_select_query_unpublished_articles.yaml') as c:
|
|
self.conf = yaml.load(c)
|
|
curr_time = datetime.now()
|
|
exp_time = curr_time + timedelta(hours=10)
|
|
self.claims = {
|
|
'sub': '1234567890',
|
|
'name': 'John Doe',
|
|
'iat': math.floor(curr_time.timestamp()),
|
|
'exp': math.floor(exp_time.timestamp())
|
|
}
|
|
|
|
@pytest.fixture(scope='class')
|
|
def setup(self, postgis, hge_ctx):
|
|
self.dir = 'queries/graphql_query/permissions'
|
|
hge_ctx.v1q_f(self.dir + '/setup.yaml')
|
|
yield
|
|
hge_ctx.v1q_f(self.dir + '/teardown.yaml')
|
|
|
|
|
|
def gen_rsa_key():
|
|
private_key = rsa.generate_private_key(
|
|
public_exponent=65537,
|
|
key_size=2048,
|
|
backend=default_backend()
|
|
)
|
|
pem = private_key.private_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
|
encryption_algorithm=serialization.NoEncryption()
|
|
)
|
|
return pem
|
|
|
|
class TestSubscriptionJwtExpiry(object):
|
|
|
|
def test_jwt_expiry(self, hge_ctx, hge_key, ws_client):
|
|
curr_time = datetime.now()
|
|
self.claims = {
|
|
'sub': '1234567890',
|
|
'name': 'John Doe',
|
|
'iat': math.floor(curr_time.timestamp())
|
|
}
|
|
hasura_claims = mk_claims(hge_ctx.hge_jwt_conf, {
|
|
'x-hasura-user-id': '1',
|
|
'x-hasura-default-role': 'user',
|
|
'x-hasura-allowed-roles': ['user'],
|
|
})
|
|
claims_namespace_path = None
|
|
if 'claims_namespace_path' in hge_ctx.hge_jwt_conf_dict:
|
|
claims_namespace_path = hge_ctx.hge_jwt_conf_dict['claims_namespace_path']
|
|
|
|
self.claims = mk_claims_with_namespace_path(self.claims,hasura_claims,claims_namespace_path)
|
|
exp = curr_time + timedelta(seconds=4)
|
|
self.claims['exp'] = round(exp.timestamp())
|
|
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm=hge_ctx.hge_jwt_algo)
|
|
authz_header = mk_authz_header(hge_ctx.hge_jwt_conf, token)
|
|
payload = dict()
|
|
payload['headers'] = authz_header
|
|
init_ws_conn(hge_key, ws_client, payload)
|
|
time.sleep(6)
|
|
assert ws_client.remote_closed == True, ws_client.remote_closed
|
|
|
|
|
|
@pytest.mark.parametrize('endpoint', ['/v1/graphql', '/v1alpha1/graphql'])
|
|
class TestJwtAudienceCheck():
|
|
def test_jwt_valid_audience(self, hge_ctx, endpoint):
|
|
jwt_conf = json.loads(hge_ctx.hge_jwt_conf)
|
|
if 'audience' not in jwt_conf:
|
|
pytest.skip('audience not present in conf, skipping testing audience')
|
|
|
|
audience = jwt_conf['audience']
|
|
audience = audience if isinstance(audience, str) else audience[0]
|
|
hasura_claims = mk_claims(hge_ctx.hge_jwt_conf, {
|
|
'x-hasura-user-id': '1',
|
|
'x-hasura-default-role': 'user',
|
|
'x-hasura-allowed-roles': ['user'],
|
|
})
|
|
claims_namespace_path = None
|
|
if 'claims_namespace_path' in hge_ctx.hge_jwt_conf_dict:
|
|
claims_namespace_path = hge_ctx.hge_jwt_conf_dict['claims_namespace_path']
|
|
|
|
self.claims = mk_claims_with_namespace_path(self.claims,hasura_claims,claims_namespace_path)
|
|
self.claims['aud'] = audience
|
|
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm=hge_ctx.hge_jwt_algo)
|
|
self.conf['headers']['Authorization'] = 'Bearer ' + token
|
|
check_query(hge_ctx, self.conf, add_auth=False,claims_namespace_path=claims_namespace_path)
|
|
|
|
def test_jwt_invalid_audience(self, hge_ctx, endpoint):
|
|
jwt_conf = json.loads(hge_ctx.hge_jwt_conf)
|
|
if 'audience' not in jwt_conf:
|
|
pytest.skip('audience not present in conf, skipping testing audience')
|
|
|
|
hasura_claims = mk_claims(hge_ctx.hge_jwt_conf, {
|
|
'x-hasura-user-id': '1',
|
|
'x-hasura-default-role': 'user',
|
|
'x-hasura-allowed-roles': ['user'],
|
|
})
|
|
claims_namespace_path = None
|
|
if 'claims_namespace_path' in hge_ctx.hge_jwt_conf_dict:
|
|
claims_namespace_path = hge_ctx.hge_jwt_conf_dict['claims_namespace_path']
|
|
|
|
self.claims = mk_claims_with_namespace_path(self.claims,hasura_claims,claims_namespace_path)
|
|
self.claims['aud'] = 'rubbish_audience'
|
|
|
|
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm=hge_ctx.hge_jwt_algo)
|
|
self.conf['headers']['Authorization'] = 'Bearer ' + token
|
|
self.conf['response'] = {
|
|
'errors': [{
|
|
'extensions': {
|
|
'code': 'invalid-jwt',
|
|
'path': '$'
|
|
},
|
|
'message': 'Could not verify JWT: JWTNotInAudience'
|
|
}]
|
|
}
|
|
self.conf['url'] = endpoint
|
|
if endpoint == '/v1/graphql':
|
|
self.conf['status'] = 200
|
|
if endpoint == '/v1alpha1/graphql':
|
|
self.conf['status'] = 400
|
|
check_query(hge_ctx, self.conf, add_auth=False,claims_namespace_path=claims_namespace_path)
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def transact(self, setup):
|
|
self.dir = 'queries/graphql_query/permissions'
|
|
with open(self.dir + '/user_select_query_unpublished_articles.yaml') as c:
|
|
self.conf = yaml.load(c)
|
|
curr_time = datetime.now()
|
|
exp_time = curr_time + timedelta(hours=1)
|
|
self.claims = {
|
|
'sub': '1234567890',
|
|
'name': 'John Doe',
|
|
'iat': math.floor(curr_time.timestamp()),
|
|
'exp': math.floor(exp_time.timestamp())
|
|
}
|
|
|
|
@pytest.fixture(scope='class')
|
|
def setup(self, postgis, hge_ctx):
|
|
self.dir = 'queries/graphql_query/permissions'
|
|
hge_ctx.v1q_f(self.dir + '/setup.yaml')
|
|
yield
|
|
hge_ctx.v1q_f(self.dir + '/teardown.yaml')
|
|
|
|
@pytest.mark.parametrize('endpoint', ['/v1/graphql', '/v1alpha1/graphql'])
|
|
class TestJwtIssuerCheck():
|
|
def test_jwt_valid_issuer(self, hge_ctx, endpoint):
|
|
jwt_conf = json.loads(hge_ctx.hge_jwt_conf)
|
|
if 'issuer' not in jwt_conf:
|
|
pytest.skip('issuer not present in conf, skipping testing issuer')
|
|
|
|
issuer = jwt_conf['issuer']
|
|
hasura_claims = mk_claims(hge_ctx.hge_jwt_conf, {
|
|
'x-hasura-user-id': '1',
|
|
'x-hasura-default-role': 'user',
|
|
'x-hasura-allowed-roles': ['user'],
|
|
})
|
|
claims_namespace_path = None
|
|
if 'claims_namespace_path' in hge_ctx.hge_jwt_conf_dict:
|
|
claims_namespace_path = hge_ctx.hge_jwt_conf_dict['claims_namespace_path']
|
|
|
|
self.claims = mk_claims_with_namespace_path(self.claims,hasura_claims,claims_namespace_path)
|
|
self.claims['iss'] = issuer
|
|
|
|
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm=hge_ctx.hge_jwt_algo)
|
|
self.conf['headers']['Authorization'] = 'Bearer ' + token
|
|
check_query(hge_ctx, self.conf, add_auth=False,claims_namespace_path=claims_namespace_path)
|
|
|
|
def test_jwt_invalid_issuer(self, hge_ctx, endpoint):
|
|
jwt_conf = json.loads(hge_ctx.hge_jwt_conf)
|
|
if 'issuer' not in jwt_conf:
|
|
pytest.skip('issuer not present in conf, skipping testing issuer')
|
|
|
|
hasura_claims = mk_claims(hge_ctx.hge_jwt_conf, {
|
|
'x-hasura-user-id': '1',
|
|
'x-hasura-default-role': 'user',
|
|
'x-hasura-allowed-roles': ['user'],
|
|
})
|
|
claims_namespace_path = None
|
|
if 'claims_namespace_path' in hge_ctx.hge_jwt_conf_dict:
|
|
claims_namespace_path = hge_ctx.hge_jwt_conf_dict['claims_namespace_path']
|
|
|
|
self.claims = mk_claims_with_namespace_path(self.claims,hasura_claims,claims_namespace_path)
|
|
self.claims['iss'] = 'rubbish_issuer'
|
|
|
|
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm=hge_ctx.hge_jwt_algo)
|
|
self.conf['headers']['Authorization'] = 'Bearer ' + token
|
|
self.conf['response'] = {
|
|
'errors': [{
|
|
'extensions': {
|
|
'code': 'invalid-jwt',
|
|
'path': '$'
|
|
},
|
|
'message': 'Could not verify JWT: JWTNotInIssuer'
|
|
}]
|
|
}
|
|
self.conf['url'] = endpoint
|
|
if endpoint == '/v1/graphql':
|
|
self.conf['status'] = 200
|
|
if endpoint == '/v1alpha1/graphql':
|
|
self.conf['status'] = 400
|
|
check_query(hge_ctx, self.conf, add_auth=False,claims_namespace_path=claims_namespace_path)
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def transact(self, setup):
|
|
self.dir = 'queries/graphql_query/permissions'
|
|
with open(self.dir + '/user_select_query_unpublished_articles.yaml') as c:
|
|
self.conf = yaml.load(c)
|
|
curr_time = datetime.now()
|
|
exp_time = curr_time + timedelta(hours=1)
|
|
self.claims = {
|
|
'sub': '1234567890',
|
|
'name': 'John Doe',
|
|
'iat': math.floor(curr_time.timestamp()),
|
|
'exp': math.floor(exp_time.timestamp())
|
|
}
|
|
|
|
@pytest.fixture(scope='class')
|
|
def setup(self, postgis, hge_ctx):
|
|
self.dir = 'queries/graphql_query/permissions'
|
|
hge_ctx.v1q_f(self.dir + '/setup.yaml')
|
|
yield
|
|
hge_ctx.v1q_f(self.dir + '/teardown.yaml')
|