mirror of
https://github.com/hasura/graphql-engine.git
synced 2025-01-07 08:13:18 +03:00
444 lines
17 KiB
Python
444 lines
17 KiB
Python
from datetime import datetime, timedelta
|
|
import math
|
|
import json
|
|
import time
|
|
|
|
import yaml
|
|
import pytest
|
|
import jwt
|
|
from test_subscriptions import init_ws_conn
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
|
from cryptography.hazmat.primitives import serialization
|
|
|
|
from validate import check_query
|
|
|
|
|
|
if not pytest.config.getoption('--hge-jwt-key-file'):
|
|
pytest.skip('--hge-jwt-key-file is missing, skipping JWT tests', allow_module_level=True)
|
|
|
|
if not pytest.config.getoption('--hge-jwt-conf'):
|
|
pytest.skip('--hge-jwt-key-conf is missing, skipping JWT tests', allow_module_level=True)
|
|
|
|
def get_claims_fmt(raw_conf):
|
|
conf = json.loads(raw_conf)
|
|
try:
|
|
claims_fmt = conf['claims_format']
|
|
except KeyError:
|
|
claims_fmt = 'json'
|
|
return claims_fmt
|
|
|
|
def mk_claims(conf, claims):
|
|
claims_fmt = get_claims_fmt(conf)
|
|
if claims_fmt == 'json':
|
|
return claims
|
|
elif claims_fmt == 'stringified_json':
|
|
return json.dumps(claims)
|
|
else:
|
|
return claims
|
|
|
|
@pytest.mark.parametrize('endpoint', ['/v1/graphql', '/v1alpha1/graphql'])
|
|
class TestJWTBasic():
|
|
|
|
def test_jwt_valid_claims_success(self, hge_ctx, endpoint):
|
|
self.claims['https://hasura.io/jwt/claims'] = mk_claims(hge_ctx.hge_jwt_conf, {
|
|
'x-hasura-user-id': '1',
|
|
'x-hasura-allowed-roles': ['user', 'editor'],
|
|
'x-hasura-default-role': 'user'
|
|
})
|
|
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8')
|
|
self.conf['headers']['Authorization'] = 'Bearer ' + token
|
|
self.conf['url'] = endpoint
|
|
self.conf['status'] = 200
|
|
check_query(hge_ctx, self.conf, add_auth=False)
|
|
|
|
def test_jwt_invalid_role_in_request_header(self, hge_ctx, endpoint):
|
|
self.claims['https://hasura.io/jwt/claims'] = mk_claims(hge_ctx.hge_jwt_conf, {
|
|
'x-hasura-user-id': '1',
|
|
'x-hasura-allowed-roles': ['contractor', 'editor'],
|
|
'x-hasura-default-role': 'contractor'
|
|
})
|
|
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8')
|
|
self.conf['headers']['Authorization'] = 'Bearer ' + token
|
|
self.conf['response'] = {
|
|
'errors': [{
|
|
'extensions': {
|
|
'code': 'access-denied',
|
|
'path': '$'
|
|
},
|
|
'message': 'Your current role is not in allowed roles'
|
|
}]
|
|
}
|
|
self.conf['url'] = endpoint
|
|
if endpoint == '/v1/graphql':
|
|
self.conf['status'] = 200
|
|
if endpoint == '/v1alpha1/graphql':
|
|
self.conf['status'] = 400
|
|
check_query(hge_ctx, self.conf, add_auth=False)
|
|
|
|
def test_jwt_no_allowed_roles_in_claim(self, hge_ctx, endpoint):
|
|
self.claims['https://hasura.io/jwt/claims'] = mk_claims(hge_ctx.hge_jwt_conf, {
|
|
'x-hasura-user-id': '1',
|
|
'x-hasura-default-role': 'user'
|
|
})
|
|
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8')
|
|
self.conf['headers']['Authorization'] = 'Bearer ' + token
|
|
self.conf['response'] = {
|
|
'errors': [{
|
|
'extensions': {
|
|
'code': 'jwt-missing-role-claims',
|
|
'path': '$'
|
|
},
|
|
'message': 'JWT claim does not contain x-hasura-allowed-roles'
|
|
}]
|
|
}
|
|
self.conf['url'] = endpoint
|
|
if endpoint == '/v1/graphql':
|
|
self.conf['status'] = 200
|
|
if endpoint == '/v1alpha1/graphql':
|
|
self.conf['status'] = 400
|
|
check_query(hge_ctx, self.conf, add_auth=False)
|
|
|
|
def test_jwt_invalid_allowed_roles_in_claim(self, hge_ctx, endpoint):
|
|
self.claims['https://hasura.io/jwt/claims'] = mk_claims(hge_ctx.hge_jwt_conf, {
|
|
'x-hasura-user-id': '1',
|
|
'x-hasura-allowed-roles': 'user',
|
|
'x-hasura-default-role': 'user'
|
|
})
|
|
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8')
|
|
self.conf['headers']['Authorization'] = 'Bearer ' + token
|
|
self.conf['response'] = {
|
|
'errors': [{
|
|
'extensions': {
|
|
'code': 'jwt-invalid-claims',
|
|
'path': '$'
|
|
},
|
|
'message': 'invalid x-hasura-allowed-roles; should be a list of roles'
|
|
}]
|
|
}
|
|
self.conf['url'] = endpoint
|
|
if endpoint == '/v1/graphql':
|
|
self.conf['status'] = 200
|
|
if endpoint == '/v1alpha1/graphql':
|
|
self.conf['status'] = 400
|
|
check_query(hge_ctx, self.conf, add_auth=False)
|
|
|
|
def test_jwt_no_default_role(self, hge_ctx, endpoint):
|
|
self.claims['https://hasura.io/jwt/claims'] = mk_claims(hge_ctx.hge_jwt_conf, {
|
|
'x-hasura-user-id': '1',
|
|
'x-hasura-allowed-roles': ['user'],
|
|
})
|
|
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8')
|
|
self.conf['headers']['Authorization'] = 'Bearer ' + token
|
|
self.conf['response'] = {
|
|
'errors': [{
|
|
'extensions': {
|
|
'code': 'jwt-missing-role-claims',
|
|
'path': '$'
|
|
},
|
|
'message': 'JWT claim does not contain x-hasura-default-role'
|
|
}]
|
|
}
|
|
self.conf['url'] = endpoint
|
|
if endpoint == '/v1/graphql':
|
|
self.conf['status'] = 200
|
|
if endpoint == '/v1alpha1/graphql':
|
|
self.conf['status'] = 400
|
|
check_query(hge_ctx, self.conf, add_auth=False)
|
|
|
|
def test_jwt_expired(self, hge_ctx, endpoint):
|
|
self.claims['https://hasura.io/jwt/claims'] = mk_claims(hge_ctx.hge_jwt_conf, {
|
|
'x-hasura-user-id': '1',
|
|
'x-hasura-default-role': 'user',
|
|
'x-hasura-allowed-roles': ['user'],
|
|
})
|
|
exp = datetime.now() - timedelta(minutes=1)
|
|
self.claims['exp'] = round(exp.timestamp())
|
|
|
|
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8')
|
|
self.conf['headers']['Authorization'] = 'Bearer ' + token
|
|
self.conf['response'] = {
|
|
'errors': [{
|
|
'extensions': {
|
|
'code': 'invalid-jwt',
|
|
'path': '$'
|
|
},
|
|
'message': 'Could not verify JWT: JWTExpired'
|
|
}]
|
|
}
|
|
self.conf['url'] = endpoint
|
|
if endpoint == '/v1/graphql':
|
|
self.conf['status'] = 200
|
|
if endpoint == '/v1alpha1/graphql':
|
|
self.conf['status'] = 400
|
|
check_query(hge_ctx, self.conf, add_auth=False)
|
|
|
|
def test_jwt_invalid_signature(self, hge_ctx, endpoint):
|
|
self.claims['https://hasura.io/jwt/claims'] = mk_claims(hge_ctx.hge_jwt_conf, {
|
|
'x-hasura-user-id': '1',
|
|
'x-hasura-default-role': 'user',
|
|
'x-hasura-allowed-roles': ['user'],
|
|
})
|
|
|
|
wrong_key = gen_rsa_key()
|
|
token = jwt.encode(self.claims, wrong_key, algorithm='HS256').decode('utf-8')
|
|
self.conf['headers']['Authorization'] = 'Bearer ' + token
|
|
self.conf['response'] = {
|
|
'errors': [{
|
|
'extensions': {
|
|
'code': 'invalid-jwt',
|
|
'path': '$'
|
|
},
|
|
'message': 'Could not verify JWT: JWSError JWSInvalidSignature'
|
|
}]
|
|
}
|
|
self.conf['url'] = endpoint
|
|
if endpoint == '/v1/graphql':
|
|
self.conf['status'] = 200
|
|
if endpoint == '/v1alpha1/graphql':
|
|
self.conf['status'] = 400
|
|
check_query(hge_ctx, self.conf, add_auth=False)
|
|
|
|
def test_jwt_no_audience_in_conf(self, hge_ctx, endpoint):
|
|
jwt_conf = json.loads(hge_ctx.hge_jwt_conf)
|
|
if 'audience' in jwt_conf:
|
|
pytest.skip('audience present in conf, skipping testing no audience')
|
|
|
|
self.claims['https://hasura.io/jwt/claims'] = mk_claims(hge_ctx.hge_jwt_conf, {
|
|
'x-hasura-user-id': '1',
|
|
'x-hasura-default-role': 'user',
|
|
'x-hasura-allowed-roles': ['user'],
|
|
})
|
|
|
|
self.claims['aud'] = 'hasura-test-suite'
|
|
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8')
|
|
self.conf['headers']['Authorization'] = 'Bearer ' + token
|
|
self.conf['url'] = endpoint
|
|
check_query(hge_ctx, self.conf, add_auth=False)
|
|
|
|
def test_jwt_no_issuer_in_conf(self, hge_ctx, endpoint):
|
|
jwt_conf = json.loads(hge_ctx.hge_jwt_conf)
|
|
if 'issuer' in jwt_conf:
|
|
pytest.skip('issuer present in conf, skipping testing no issuer')
|
|
|
|
self.claims['https://hasura.io/jwt/claims'] = mk_claims(hge_ctx.hge_jwt_conf, {
|
|
'x-hasura-user-id': '1',
|
|
'x-hasura-default-role': 'user',
|
|
'x-hasura-allowed-roles': ['user'],
|
|
})
|
|
|
|
self.claims['iss'] = 'rubbish-issuer'
|
|
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8')
|
|
self.conf['headers']['Authorization'] = 'Bearer ' + token
|
|
self.conf['url'] = endpoint
|
|
check_query(hge_ctx, self.conf, add_auth=False)
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def transact(self, setup):
|
|
self.dir = 'queries/graphql_query/permissions'
|
|
with open(self.dir + '/user_select_query_unpublished_articles.yaml') as c:
|
|
self.conf = yaml.safe_load(c)
|
|
curr_time = datetime.now()
|
|
exp_time = curr_time + timedelta(hours=1)
|
|
self.claims = {
|
|
'sub': '1234567890',
|
|
'name': 'John Doe',
|
|
'iat': math.floor(curr_time.timestamp()),
|
|
'exp': math.floor(exp_time.timestamp())
|
|
}
|
|
|
|
@pytest.fixture(scope='class')
|
|
def setup(self, request, hge_ctx):
|
|
self.dir = 'queries/graphql_query/permissions'
|
|
st_code, resp = hge_ctx.v1q_f(self.dir + '/setup.yaml')
|
|
assert st_code == 200, resp
|
|
yield
|
|
st_code, resp = hge_ctx.v1q_f(self.dir + '/teardown.yaml')
|
|
assert st_code == 200, resp
|
|
|
|
|
|
def gen_rsa_key():
|
|
private_key = rsa.generate_private_key(
|
|
public_exponent=65537,
|
|
key_size=2048,
|
|
backend=default_backend()
|
|
)
|
|
pem = private_key.private_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
|
encryption_algorithm=serialization.NoEncryption()
|
|
)
|
|
return pem
|
|
|
|
class TestSubscriptionJwtExpiry(object):
|
|
|
|
def test_jwt_expiry(self, hge_ctx, ws_client):
|
|
curr_time = datetime.now()
|
|
self.claims = {
|
|
'sub': '1234567890',
|
|
'name': 'John Doe',
|
|
'iat': math.floor(curr_time.timestamp())
|
|
}
|
|
self.claims['https://hasura.io/jwt/claims'] = mk_claims(hge_ctx.hge_jwt_conf, {
|
|
'x-hasura-user-id': '1',
|
|
'x-hasura-default-role': 'user',
|
|
'x-hasura-allowed-roles': ['user'],
|
|
})
|
|
exp = curr_time + timedelta(seconds=4)
|
|
self.claims['exp'] = round(exp.timestamp())
|
|
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8')
|
|
payload = {
|
|
'headers': {
|
|
'Authorization': 'Bearer ' + token
|
|
}
|
|
}
|
|
init_ws_conn(hge_ctx, ws_client, payload)
|
|
time.sleep(6)
|
|
assert ws_client.remote_closed == True, ws_client.remote_closed
|
|
|
|
|
|
@pytest.mark.parametrize('endpoint', ['/v1/graphql', '/v1alpha1/graphql'])
|
|
class TestJwtAudienceCheck():
|
|
def test_jwt_valid_audience(self, hge_ctx, endpoint):
|
|
jwt_conf = json.loads(hge_ctx.hge_jwt_conf)
|
|
if 'audience' not in jwt_conf:
|
|
pytest.skip('audience not present in conf, skipping testing audience')
|
|
|
|
audience = jwt_conf['audience']
|
|
audience = audience if isinstance(audience, str) else audience[0]
|
|
self.claims['https://hasura.io/jwt/claims'] = mk_claims(hge_ctx.hge_jwt_conf, {
|
|
'x-hasura-user-id': '1',
|
|
'x-hasura-default-role': 'user',
|
|
'x-hasura-allowed-roles': ['user'],
|
|
})
|
|
self.claims['aud'] = audience
|
|
|
|
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8')
|
|
self.conf['headers']['Authorization'] = 'Bearer ' + token
|
|
check_query(hge_ctx, self.conf, add_auth=False)
|
|
|
|
def test_jwt_invalid_audience(self, hge_ctx, endpoint):
|
|
jwt_conf = json.loads(hge_ctx.hge_jwt_conf)
|
|
if 'audience' not in jwt_conf:
|
|
pytest.skip('audience not present in conf, skipping testing audience')
|
|
|
|
self.claims['https://hasura.io/jwt/claims'] = mk_claims(hge_ctx.hge_jwt_conf, {
|
|
'x-hasura-user-id': '1',
|
|
'x-hasura-default-role': 'user',
|
|
'x-hasura-allowed-roles': ['user'],
|
|
})
|
|
self.claims['aud'] = 'rubbish_audience'
|
|
|
|
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8')
|
|
self.conf['headers']['Authorization'] = 'Bearer ' + token
|
|
self.conf['response'] = {
|
|
'errors': [{
|
|
'extensions': {
|
|
'code': 'invalid-jwt',
|
|
'path': '$'
|
|
},
|
|
'message': 'Could not verify JWT: JWTNotInAudience'
|
|
}]
|
|
}
|
|
self.conf['url'] = endpoint
|
|
if endpoint == '/v1/graphql':
|
|
self.conf['status'] = 200
|
|
if endpoint == '/v1alpha1/graphql':
|
|
self.conf['status'] = 400
|
|
check_query(hge_ctx, self.conf, add_auth=False)
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def transact(self, setup):
|
|
self.dir = 'queries/graphql_query/permissions'
|
|
with open(self.dir + '/user_select_query_unpublished_articles.yaml') as c:
|
|
self.conf = yaml.safe_load(c)
|
|
curr_time = datetime.now()
|
|
exp_time = curr_time + timedelta(hours=1)
|
|
self.claims = {
|
|
'sub': '1234567890',
|
|
'name': 'John Doe',
|
|
'iat': math.floor(curr_time.timestamp()),
|
|
'exp': math.floor(exp_time.timestamp())
|
|
}
|
|
|
|
@pytest.fixture(scope='class')
|
|
def setup(self, request, hge_ctx):
|
|
self.dir = 'queries/graphql_query/permissions'
|
|
st_code, resp = hge_ctx.v1q_f(self.dir + '/setup.yaml')
|
|
assert st_code == 200, resp
|
|
yield
|
|
st_code, resp = hge_ctx.v1q_f(self.dir + '/teardown.yaml')
|
|
assert st_code == 200, resp
|
|
|
|
@pytest.mark.parametrize('endpoint', ['/v1/graphql', '/v1alpha1/graphql'])
|
|
class TestJwtIssuerCheck():
|
|
def test_jwt_valid_issuer(self, hge_ctx, endpoint):
|
|
jwt_conf = json.loads(hge_ctx.hge_jwt_conf)
|
|
if 'issuer' not in jwt_conf:
|
|
pytest.skip('issuer not present in conf, skipping testing issuer')
|
|
|
|
issuer = jwt_conf['issuer']
|
|
self.claims['https://hasura.io/jwt/claims'] = mk_claims(hge_ctx.hge_jwt_conf, {
|
|
'x-hasura-user-id': '1',
|
|
'x-hasura-default-role': 'user',
|
|
'x-hasura-allowed-roles': ['user'],
|
|
})
|
|
self.claims['iss'] = issuer
|
|
|
|
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8')
|
|
self.conf['headers']['Authorization'] = 'Bearer ' + token
|
|
check_query(hge_ctx, self.conf, add_auth=False)
|
|
|
|
def test_jwt_invalid_issuer(self, hge_ctx, endpoint):
|
|
jwt_conf = json.loads(hge_ctx.hge_jwt_conf)
|
|
if 'issuer' not in jwt_conf:
|
|
pytest.skip('issuer not present in conf, skipping testing issuer')
|
|
|
|
self.claims['https://hasura.io/jwt/claims'] = mk_claims(hge_ctx.hge_jwt_conf, {
|
|
'x-hasura-user-id': '1',
|
|
'x-hasura-default-role': 'user',
|
|
'x-hasura-allowed-roles': ['user'],
|
|
})
|
|
self.claims['iss'] = 'rubbish_issuer'
|
|
|
|
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8')
|
|
self.conf['headers']['Authorization'] = 'Bearer ' + token
|
|
self.conf['response'] = {
|
|
'errors': [{
|
|
'extensions': {
|
|
'code': 'invalid-jwt',
|
|
'path': '$'
|
|
},
|
|
'message': 'Could not verify JWT: JWTNotInIssuer'
|
|
}]
|
|
}
|
|
self.conf['url'] = endpoint
|
|
if endpoint == '/v1/graphql':
|
|
self.conf['status'] = 200
|
|
if endpoint == '/v1alpha1/graphql':
|
|
self.conf['status'] = 400
|
|
check_query(hge_ctx, self.conf, add_auth=False)
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def transact(self, setup):
|
|
self.dir = 'queries/graphql_query/permissions'
|
|
with open(self.dir + '/user_select_query_unpublished_articles.yaml') as c:
|
|
self.conf = yaml.safe_load(c)
|
|
curr_time = datetime.now()
|
|
exp_time = curr_time + timedelta(hours=1)
|
|
self.claims = {
|
|
'sub': '1234567890',
|
|
'name': 'John Doe',
|
|
'iat': math.floor(curr_time.timestamp()),
|
|
'exp': math.floor(exp_time.timestamp())
|
|
}
|
|
|
|
@pytest.fixture(scope='class')
|
|
def setup(self, request, hge_ctx):
|
|
self.dir = 'queries/graphql_query/permissions'
|
|
st_code, resp = hge_ctx.v1q_f(self.dir + '/setup.yaml')
|
|
assert st_code == 200, resp
|
|
yield
|
|
st_code, resp = hge_ctx.v1q_f(self.dir + '/teardown.yaml')
|
|
assert st_code == 200, resp
|