graphql-engine/server/tests-py/test_jwt.py

445 lines
17 KiB
Python
Raw Normal View History

from datetime import datetime, timedelta
import math
import json
import time
import ruamel.yaml as yaml
import pytest
import jwt
from test_subscriptions import init_ws_conn
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
from validate import check_query
from context import PytestConf
if not PytestConf.config.getoption('--hge-jwt-key-file'):
pytest.skip('--hge-jwt-key-file is missing, skipping JWT tests', allow_module_level=True)
if not PytestConf.config.getoption('--hge-jwt-conf'):
pytest.skip('--hge-jwt-key-conf is missing, skipping JWT tests', allow_module_level=True)
def get_claims_fmt(raw_conf):
conf = json.loads(raw_conf)
try:
claims_fmt = conf['claims_format']
except KeyError:
claims_fmt = 'json'
return claims_fmt
def mk_claims(conf, claims):
claims_fmt = get_claims_fmt(conf)
if claims_fmt == 'json':
return claims
elif claims_fmt == 'stringified_json':
return json.dumps(claims)
else:
return claims
@pytest.mark.parametrize('endpoint', ['/v1/graphql', '/v1alpha1/graphql'])
class TestJWTBasic():
def test_jwt_valid_claims_success(self, hge_ctx, endpoint):
self.claims['https://hasura.io/jwt/claims'] = mk_claims(hge_ctx.hge_jwt_conf, {
'x-hasura-user-id': '1',
'x-hasura-allowed-roles': ['user', 'editor'],
'x-hasura-default-role': 'user'
})
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8')
self.conf['headers']['Authorization'] = 'Bearer ' + token
self.conf['url'] = endpoint
self.conf['status'] = 200
check_query(hge_ctx, self.conf, add_auth=False)
def test_jwt_invalid_role_in_request_header(self, hge_ctx, endpoint):
self.claims['https://hasura.io/jwt/claims'] = mk_claims(hge_ctx.hge_jwt_conf, {
'x-hasura-user-id': '1',
'x-hasura-allowed-roles': ['contractor', 'editor'],
'x-hasura-default-role': 'contractor'
})
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8')
self.conf['headers']['Authorization'] = 'Bearer ' + token
self.conf['response'] = {
'errors': [{
'extensions': {
'code': 'access-denied',
'path': '$'
},
'message': 'Your current role is not in allowed roles'
}]
}
self.conf['url'] = endpoint
if endpoint == '/v1/graphql':
self.conf['status'] = 200
if endpoint == '/v1alpha1/graphql':
self.conf['status'] = 400
check_query(hge_ctx, self.conf, add_auth=False)
def test_jwt_no_allowed_roles_in_claim(self, hge_ctx, endpoint):
self.claims['https://hasura.io/jwt/claims'] = mk_claims(hge_ctx.hge_jwt_conf, {
'x-hasura-user-id': '1',
'x-hasura-default-role': 'user'
})
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8')
self.conf['headers']['Authorization'] = 'Bearer ' + token
self.conf['response'] = {
'errors': [{
'extensions': {
'code': 'jwt-missing-role-claims',
'path': '$'
},
'message': 'JWT claim does not contain x-hasura-allowed-roles'
}]
}
self.conf['url'] = endpoint
if endpoint == '/v1/graphql':
self.conf['status'] = 200
if endpoint == '/v1alpha1/graphql':
self.conf['status'] = 400
check_query(hge_ctx, self.conf, add_auth=False)
def test_jwt_invalid_allowed_roles_in_claim(self, hge_ctx, endpoint):
self.claims['https://hasura.io/jwt/claims'] = mk_claims(hge_ctx.hge_jwt_conf, {
'x-hasura-user-id': '1',
'x-hasura-allowed-roles': 'user',
'x-hasura-default-role': 'user'
})
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8')
self.conf['headers']['Authorization'] = 'Bearer ' + token
self.conf['response'] = {
'errors': [{
'extensions': {
'code': 'jwt-invalid-claims',
'path': '$'
},
'message': 'invalid x-hasura-allowed-roles; should be a list of roles'
}]
}
self.conf['url'] = endpoint
if endpoint == '/v1/graphql':
self.conf['status'] = 200
if endpoint == '/v1alpha1/graphql':
self.conf['status'] = 400
check_query(hge_ctx, self.conf, add_auth=False)
def test_jwt_no_default_role(self, hge_ctx, endpoint):
self.claims['https://hasura.io/jwt/claims'] = mk_claims(hge_ctx.hge_jwt_conf, {
'x-hasura-user-id': '1',
'x-hasura-allowed-roles': ['user'],
})
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8')
self.conf['headers']['Authorization'] = 'Bearer ' + token
self.conf['response'] = {
'errors': [{
'extensions': {
'code': 'jwt-missing-role-claims',
'path': '$'
},
'message': 'JWT claim does not contain x-hasura-default-role'
}]
}
self.conf['url'] = endpoint
if endpoint == '/v1/graphql':
self.conf['status'] = 200
if endpoint == '/v1alpha1/graphql':
self.conf['status'] = 400
check_query(hge_ctx, self.conf, add_auth=False)
def test_jwt_expired(self, hge_ctx, endpoint):
self.claims['https://hasura.io/jwt/claims'] = mk_claims(hge_ctx.hge_jwt_conf, {
'x-hasura-user-id': '1',
'x-hasura-default-role': 'user',
'x-hasura-allowed-roles': ['user'],
})
exp = datetime.now() - timedelta(minutes=1)
self.claims['exp'] = round(exp.timestamp())
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8')
self.conf['headers']['Authorization'] = 'Bearer ' + token
self.conf['response'] = {
'errors': [{
'extensions': {
'code': 'invalid-jwt',
'path': '$'
},
'message': 'Could not verify JWT: JWTExpired'
}]
}
self.conf['url'] = endpoint
if endpoint == '/v1/graphql':
self.conf['status'] = 200
if endpoint == '/v1alpha1/graphql':
self.conf['status'] = 400
check_query(hge_ctx, self.conf, add_auth=False)
def test_jwt_invalid_signature(self, hge_ctx, endpoint):
self.claims['https://hasura.io/jwt/claims'] = mk_claims(hge_ctx.hge_jwt_conf, {
'x-hasura-user-id': '1',
'x-hasura-default-role': 'user',
'x-hasura-allowed-roles': ['user'],
})
wrong_key = gen_rsa_key()
token = jwt.encode(self.claims, wrong_key, algorithm='HS256').decode('utf-8')
self.conf['headers']['Authorization'] = 'Bearer ' + token
self.conf['response'] = {
'errors': [{
'extensions': {
'code': 'invalid-jwt',
'path': '$'
},
'message': 'Could not verify JWT: JWSError JWSInvalidSignature'
}]
}
self.conf['url'] = endpoint
if endpoint == '/v1/graphql':
self.conf['status'] = 200
if endpoint == '/v1alpha1/graphql':
self.conf['status'] = 400
check_query(hge_ctx, self.conf, add_auth=False)
def test_jwt_no_audience_in_conf(self, hge_ctx, endpoint):
2019-07-11 13:55:34 +03:00
jwt_conf = json.loads(hge_ctx.hge_jwt_conf)
if 'audience' in jwt_conf:
pytest.skip('audience present in conf, skipping testing no audience')
self.claims['https://hasura.io/jwt/claims'] = mk_claims(hge_ctx.hge_jwt_conf, {
'x-hasura-user-id': '1',
'x-hasura-default-role': 'user',
'x-hasura-allowed-roles': ['user'],
})
self.claims['aud'] = 'hasura-test-suite'
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8')
self.conf['headers']['Authorization'] = 'Bearer ' + token
self.conf['url'] = endpoint
check_query(hge_ctx, self.conf, add_auth=False)
def test_jwt_no_issuer_in_conf(self, hge_ctx, endpoint):
2019-07-11 13:55:34 +03:00
jwt_conf = json.loads(hge_ctx.hge_jwt_conf)
if 'issuer' in jwt_conf:
pytest.skip('issuer present in conf, skipping testing no issuer')
self.claims['https://hasura.io/jwt/claims'] = mk_claims(hge_ctx.hge_jwt_conf, {
'x-hasura-user-id': '1',
'x-hasura-default-role': 'user',
'x-hasura-allowed-roles': ['user'],
})
self.claims['iss'] = 'rubbish-issuer'
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8')
self.conf['headers']['Authorization'] = 'Bearer ' + token
self.conf['url'] = endpoint
check_query(hge_ctx, self.conf, add_auth=False)
@pytest.fixture(autouse=True)
run graphql tests on both http and websocket; add parallelism (close #1868) (#1921) Examples 1) ` pytest --hge-urls "http://127.0.0.1:8080" --pg-urls "postgresql://admin@127.0.0.1:5432/hge_tests" -vv ` 2) `pytest --hge-urls "http://127.0.0.1:8080" "http://127.0.0.1:8081" --pg-urls "postgresql://admin@127.0.0.1:5432/hge_tests" "postgresql://admin@127.0.0.1:5432/hge_tests2" -vv ` ### Solution and Design <!-- How is this issue solved/fixed? What is the design? --> <!-- It's better if we elaborate --> #### Reducing execution time of tests - The Schema setup and teardown, which were earlier done per test method, usually takes around 1 sec. - For mutations, the model has now been changed to only do schema setup and teardown once per test class. - A data setup and teardown will be done once per test instead (usually takes ~10ms). - For the test class to get this behaviour, one can can extend the class `DefaultTestMutations`. - The function `dir()` should be define which returns the location of the configuration folder. - Inside the configuration folder, there should be - Files `<conf_dir>/schema_setup.yaml` and `<conf_dir>/schema_teardown.yaml`, which has the metadata query executed during schema setup and teardown respectively - Files named `<conf_dir>/values_setup.yaml` and `<conf_dir>/values_teardown.yaml`. These files are executed to setup and remove data from the tables respectively. #### Running Graphql queries on both http and websockets - Each GraphQL query/mutation is run on the both HTTP and websocket protocols - Pytests test parameterisation is used to achieve this - The errors over websockets are slightly different from that on HTTP - The code takes care of converting the errors in HTTP to errors in websockets #### Parallel executation of tests. - The plugin pytest-xdist helps in running tests on parallel workers. - We are using this plugin to group tests by file and run on different workers. - Parallel test worker processes operate on separate postgres databases(and separate graphql-engines connected to these databases). Thus tests on one worker will not affect the tests on the other worker. - With two workers, this decreases execution times by half, as the tests on event triggers usually takes a long time, but does not consume much CPU.
2019-04-08 10:22:38 +03:00
def transact(self, setup):
self.dir = 'queries/graphql_query/permissions'
with open(self.dir + '/user_select_query_unpublished_articles.yaml') as c:
self.conf = yaml.safe_load(c)
curr_time = datetime.now()
exp_time = curr_time + timedelta(hours=1)
self.claims = {
'sub': '1234567890',
'name': 'John Doe',
'iat': math.floor(curr_time.timestamp()),
'exp': math.floor(exp_time.timestamp())
}
run graphql tests on both http and websocket; add parallelism (close #1868) (#1921) Examples 1) ` pytest --hge-urls "http://127.0.0.1:8080" --pg-urls "postgresql://admin@127.0.0.1:5432/hge_tests" -vv ` 2) `pytest --hge-urls "http://127.0.0.1:8080" "http://127.0.0.1:8081" --pg-urls "postgresql://admin@127.0.0.1:5432/hge_tests" "postgresql://admin@127.0.0.1:5432/hge_tests2" -vv ` ### Solution and Design <!-- How is this issue solved/fixed? What is the design? --> <!-- It's better if we elaborate --> #### Reducing execution time of tests - The Schema setup and teardown, which were earlier done per test method, usually takes around 1 sec. - For mutations, the model has now been changed to only do schema setup and teardown once per test class. - A data setup and teardown will be done once per test instead (usually takes ~10ms). - For the test class to get this behaviour, one can can extend the class `DefaultTestMutations`. - The function `dir()` should be define which returns the location of the configuration folder. - Inside the configuration folder, there should be - Files `<conf_dir>/schema_setup.yaml` and `<conf_dir>/schema_teardown.yaml`, which has the metadata query executed during schema setup and teardown respectively - Files named `<conf_dir>/values_setup.yaml` and `<conf_dir>/values_teardown.yaml`. These files are executed to setup and remove data from the tables respectively. #### Running Graphql queries on both http and websockets - Each GraphQL query/mutation is run on the both HTTP and websocket protocols - Pytests test parameterisation is used to achieve this - The errors over websockets are slightly different from that on HTTP - The code takes care of converting the errors in HTTP to errors in websockets #### Parallel executation of tests. - The plugin pytest-xdist helps in running tests on parallel workers. - We are using this plugin to group tests by file and run on different workers. - Parallel test worker processes operate on separate postgres databases(and separate graphql-engines connected to these databases). Thus tests on one worker will not affect the tests on the other worker. - With two workers, this decreases execution times by half, as the tests on event triggers usually takes a long time, but does not consume much CPU.
2019-04-08 10:22:38 +03:00
@pytest.fixture(scope='class')
def setup(self, request, hge_ctx):
self.dir = 'queries/graphql_query/permissions'
st_code, resp = hge_ctx.v1q_f(self.dir + '/setup.yaml')
assert st_code == 200, resp
yield
st_code, resp = hge_ctx.v1q_f(self.dir + '/teardown.yaml')
assert st_code == 200, resp
def gen_rsa_key():
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
)
return pem
class TestSubscriptionJwtExpiry(object):
def test_jwt_expiry(self, hge_ctx, ws_client):
curr_time = datetime.now()
self.claims = {
'sub': '1234567890',
'name': 'John Doe',
'iat': math.floor(curr_time.timestamp())
}
self.claims['https://hasura.io/jwt/claims'] = mk_claims(hge_ctx.hge_jwt_conf, {
'x-hasura-user-id': '1',
'x-hasura-default-role': 'user',
'x-hasura-allowed-roles': ['user'],
})
exp = curr_time + timedelta(seconds=4)
self.claims['exp'] = round(exp.timestamp())
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8')
payload = {
'headers': {
'Authorization': 'Bearer ' + token
}
}
init_ws_conn(hge_ctx, ws_client, payload)
time.sleep(6)
assert ws_client.remote_closed == True, ws_client.remote_closed
@pytest.mark.parametrize('endpoint', ['/v1/graphql', '/v1alpha1/graphql'])
class TestJwtAudienceCheck():
def test_jwt_valid_audience(self, hge_ctx, endpoint):
jwt_conf = json.loads(hge_ctx.hge_jwt_conf)
if 'audience' not in jwt_conf:
pytest.skip('audience not present in conf, skipping testing audience')
audience = jwt_conf['audience']
audience = audience if isinstance(audience, str) else audience[0]
self.claims['https://hasura.io/jwt/claims'] = mk_claims(hge_ctx.hge_jwt_conf, {
'x-hasura-user-id': '1',
'x-hasura-default-role': 'user',
'x-hasura-allowed-roles': ['user'],
})
self.claims['aud'] = audience
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8')
self.conf['headers']['Authorization'] = 'Bearer ' + token
check_query(hge_ctx, self.conf, add_auth=False)
def test_jwt_invalid_audience(self, hge_ctx, endpoint):
jwt_conf = json.loads(hge_ctx.hge_jwt_conf)
if 'audience' not in jwt_conf:
pytest.skip('audience not present in conf, skipping testing audience')
self.claims['https://hasura.io/jwt/claims'] = mk_claims(hge_ctx.hge_jwt_conf, {
'x-hasura-user-id': '1',
'x-hasura-default-role': 'user',
'x-hasura-allowed-roles': ['user'],
})
self.claims['aud'] = 'rubbish_audience'
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8')
self.conf['headers']['Authorization'] = 'Bearer ' + token
self.conf['response'] = {
'errors': [{
'extensions': {
'code': 'invalid-jwt',
'path': '$'
},
'message': 'Could not verify JWT: JWTNotInAudience'
}]
}
self.conf['url'] = endpoint
if endpoint == '/v1/graphql':
self.conf['status'] = 200
if endpoint == '/v1alpha1/graphql':
self.conf['status'] = 400
check_query(hge_ctx, self.conf, add_auth=False)
@pytest.fixture(autouse=True)
def transact(self, setup):
self.dir = 'queries/graphql_query/permissions'
with open(self.dir + '/user_select_query_unpublished_articles.yaml') as c:
self.conf = yaml.safe_load(c)
curr_time = datetime.now()
exp_time = curr_time + timedelta(hours=1)
self.claims = {
'sub': '1234567890',
'name': 'John Doe',
'iat': math.floor(curr_time.timestamp()),
'exp': math.floor(exp_time.timestamp())
}
@pytest.fixture(scope='class')
def setup(self, request, hge_ctx):
self.dir = 'queries/graphql_query/permissions'
st_code, resp = hge_ctx.v1q_f(self.dir + '/setup.yaml')
assert st_code == 200, resp
yield
st_code, resp = hge_ctx.v1q_f(self.dir + '/teardown.yaml')
assert st_code == 200, resp
@pytest.mark.parametrize('endpoint', ['/v1/graphql', '/v1alpha1/graphql'])
class TestJwtIssuerCheck():
def test_jwt_valid_issuer(self, hge_ctx, endpoint):
jwt_conf = json.loads(hge_ctx.hge_jwt_conf)
if 'issuer' not in jwt_conf:
pytest.skip('issuer not present in conf, skipping testing issuer')
issuer = jwt_conf['issuer']
self.claims['https://hasura.io/jwt/claims'] = mk_claims(hge_ctx.hge_jwt_conf, {
'x-hasura-user-id': '1',
'x-hasura-default-role': 'user',
'x-hasura-allowed-roles': ['user'],
})
self.claims['iss'] = issuer
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8')
self.conf['headers']['Authorization'] = 'Bearer ' + token
check_query(hge_ctx, self.conf, add_auth=False)
def test_jwt_invalid_issuer(self, hge_ctx, endpoint):
jwt_conf = json.loads(hge_ctx.hge_jwt_conf)
if 'issuer' not in jwt_conf:
pytest.skip('issuer not present in conf, skipping testing issuer')
self.claims['https://hasura.io/jwt/claims'] = mk_claims(hge_ctx.hge_jwt_conf, {
'x-hasura-user-id': '1',
'x-hasura-default-role': 'user',
'x-hasura-allowed-roles': ['user'],
})
self.claims['iss'] = 'rubbish_issuer'
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8')
self.conf['headers']['Authorization'] = 'Bearer ' + token
self.conf['response'] = {
'errors': [{
'extensions': {
'code': 'invalid-jwt',
'path': '$'
},
'message': 'Could not verify JWT: JWTNotInIssuer'
}]
}
self.conf['url'] = endpoint
if endpoint == '/v1/graphql':
self.conf['status'] = 200
if endpoint == '/v1alpha1/graphql':
self.conf['status'] = 400
check_query(hge_ctx, self.conf, add_auth=False)
@pytest.fixture(autouse=True)
def transact(self, setup):
self.dir = 'queries/graphql_query/permissions'
with open(self.dir + '/user_select_query_unpublished_articles.yaml') as c:
self.conf = yaml.safe_load(c)
curr_time = datetime.now()
exp_time = curr_time + timedelta(hours=1)
self.claims = {
'sub': '1234567890',
'name': 'John Doe',
'iat': math.floor(curr_time.timestamp()),
'exp': math.floor(exp_time.timestamp())
}
@pytest.fixture(scope='class')
def setup(self, request, hge_ctx):
self.dir = 'queries/graphql_query/permissions'
st_code, resp = hge_ctx.v1q_f(self.dir + '/setup.yaml')
assert st_code == 200, resp
yield
st_code, resp = hge_ctx.v1q_f(self.dir + '/teardown.yaml')
assert st_code == 200, resp