graphql-engine/server/tests-py/test_jwt_claims_map.py
Samir Talwar 26e03a07bb server/tests-py: Parallelize JWT tests.
This rewrites the JWT tests to generate and specify the secrets per test class, and to provide the server configuration to the HGE fixture.

It covers the tests in:

  - *test_jwt.py*
  - *test_jwt_claims_map.py*
  - *test_config_api.py*
  - *test_graphql_queries.py* (just a couple here)

This does reduce the number of code paths exercised with JWT, as we were previously running *all* tests with JWT tokens. However, this seems excessive; we don't need to tread every code path, just enough to ensure we handle the tokens appropriately. I believe that the test coverage in *test_jwt.py* does this well enough (though I'd prefer if we moved the coverage lower down in the stack as unit tests).

These tests were configured in multiple different ways by *test-server.sh*; this configuration is now moved to test subclasses within the various files. This results in a bit of duplication.

Unfortunately, the tests would ideally use parameterization rather than subclassing, but that doesn't work because of `hge_fixture_env`, which creates a "soft" dependency between the environment variables and `hge_server`. Parameterizing the former *should* force the latter to be recreated for each new set of environment variables, but `hge_server` isn't actually aware there's a dependency.

It currently looks like this adds lines of code; we'll more than make up for it when we delete the relevant lines from *test-server.sh*. I am not doing that here because I plan on deleting the whole file in a subsequent changeset.

[NDAT-538]: https://hasurahq.atlassian.net/browse/NDAT-538?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ

PR-URL: https://github.com/hasura/graphql-engine-mono/pull/8803
GitOrigin-RevId: f7f2caa62de0b0a45e42964b69a8ae73d1575fe8
2023-04-19 10:30:21 +00:00

336 lines
13 KiB
Python

from datetime import datetime, timedelta
import math
import jwt
import pytest
from ruamel.yaml import YAML
from validate import check_query
yaml = YAML(typ='safe', pure=True)
basic_claims_map = {
'x-hasura-user-id': {
'path': "$.['https://myapp.com/jwt/claims'].user.id"
},
'x-hasura-allowed-roles': {
'path': "$.['https://myapp.com/jwt/claims'].role.allowed"
},
'x-hasura-default-role': {
'path': "$.['https://myapp.com/jwt/claims'].role.default"
}
}
basic_claims_map_with_default_values = {
'x-hasura-user-id': {
'path': "$.['https://myapp.com/jwt/claims'].user.id",
'default': '1'
},
'x-hasura-allowed-roles': {
'path': "$.['https://myapp.com/jwt/claims'].role.allowed",
'default': ['user', 'editor']
},
'x-hasura-default-role': {
'path': "$.['https://myapp.com/jwt/claims'].role.default",
'default': 'user'
}
}
static_claims_map = {
'x-hasura-user-id': {
'path': "$.['https://myapp.com/jwt/claims'].user.id"
},
'x-hasura-allowed-roles': ['user','editor'],
'x-hasura-default-role': 'user',
'x-hasura-custom-header': 'custom-value'
}
def clean_null_terms(d):
clean = {}
for k, v in d.items():
if isinstance(v, dict):
nested = clean_null_terms(v)
if len(nested.keys()) > 0:
clean[k] = nested
elif v is not None:
clean[k] = v
return clean
# TestJWTClaimsMapBasic will be called using two different JWT configs
# one with default values and the other without default values. The
# default values here is referred to the default value that's being
# used when a value is not found while looking up the JWT token using
# the JSON Path provided
@pytest.mark.admin_secret
@pytest.mark.parametrize('endpoint', ['/v1/graphql', '/v1alpha1/graphql'])
class AbstractTestJWTClaimsMapBasic:
def mk_claims(self, user_id=None, allowed_roles=None, default_role=None):
self.claims['https://myapp.com/jwt/claims'] = clean_null_terms({
'user': {
'id': user_id
},
'role': {
'allowed': allowed_roles,
'default': default_role
}
})
def test_jwt_claims_map_valid_claims_success(self, hge_ctx, jwt_configuration, endpoint):
self.mk_claims('1', ['user', 'editor'], 'user')
token = jwt.encode(self.claims, jwt_configuration.private_key, algorithm=jwt_configuration.algorithm)
self.conf['headers']['Authorization'] = 'Bearer ' + token
self.conf['url'] = endpoint
self.conf['status'] = 200
check_query(hge_ctx, self.conf, add_auth=False)
def test_jwt_claims_map_invalid_role_in_request_header(self, hge_ctx, jwt_configuration, endpoint):
self.mk_claims('1', ['contractor', 'editor'], 'contractor')
token = jwt.encode(self.claims, jwt_configuration.private_key, algorithm=jwt_configuration.algorithm)
self.conf['headers']['Authorization'] = 'Bearer ' + token
self.conf['response'] = {
'errors': [{
'extensions': {
'code': 'access-denied',
'path': '$'
},
'message': 'Your requested role is not in allowed roles'
}]
}
self.conf['url'] = endpoint
if endpoint == '/v1/graphql':
self.conf['status'] = 200
if endpoint == '/v1alpha1/graphql':
self.conf['status'] = 400
check_query(hge_ctx, self.conf, add_auth=False)
def test_jwt_claims_map_no_allowed_roles_in_claim(self, hge_ctx, jwt_configuration, endpoint):
self.mk_claims('1', None, 'user')
default_allowed_roles = jwt_configuration.server_configuration['claims_map']['x-hasura-allowed-roles'].get('default')
token = jwt.encode(self.claims, jwt_configuration.private_key, algorithm=jwt_configuration.algorithm)
self.conf['headers']['Authorization'] = 'Bearer ' + token
if default_allowed_roles is None:
self.conf['response'] = {
'errors': [{
'extensions': {
'code': 'jwt-missing-role-claims',
'path': '$'
},
'message': 'JWT claim does not contain x-hasura-allowed-roles'
}]
}
self.conf['url'] = endpoint
if endpoint == '/v1/graphql':
self.conf['status'] = 200
if endpoint == '/v1alpha1/graphql':
self.conf['status'] = 400
else:
self.conf['status'] = 200
self.conf['url'] = endpoint
check_query(hge_ctx, self.conf, add_auth=False)
def test_jwt_claims_map_invalid_allowed_roles_in_claim(self, hge_ctx, jwt_configuration, endpoint):
self.mk_claims('1', 'user', 'user')
token = jwt.encode(self.claims, jwt_configuration.private_key, algorithm=jwt_configuration.algorithm)
self.conf['headers']['Authorization'] = 'Bearer ' + token
self.conf['response'] = {
'errors': [{
'extensions': {
'code': 'jwt-invalid-claims',
'path': '$'
},
'message': 'invalid x-hasura-allowed-roles; should be a list of roles: parsing [] failed, expected Array, but encountered String'
}]
}
self.conf['url'] = endpoint
if endpoint == '/v1/graphql':
self.conf['status'] = 200
if endpoint == '/v1alpha1/graphql':
self.conf['status'] = 400
check_query(hge_ctx, self.conf, add_auth=False)
def test_jwt_claims_map_no_default_role(self, hge_ctx, jwt_configuration, endpoint):
# default_default_role is the default default role set in the JWT config
# when the lookup with the JSONPath fails, this is the value that will
# be used for the `x-hasura-default-role` claim
default_default_role = jwt_configuration.server_configuration['claims_map']['x-hasura-default-role'].get('default')
self.mk_claims('1', ['user'])
token = jwt.encode(self.claims, jwt_configuration.private_key, algorithm=jwt_configuration.algorithm)
self.conf['headers']['Authorization'] = 'Bearer ' + token
if default_default_role is None:
self.conf['response'] = {
'errors': [{
'extensions': {
'code': 'jwt-missing-role-claims',
'path': '$'
},
'message': 'JWT claim does not contain x-hasura-default-role'
}]
}
if endpoint == '/v1/graphql':
self.conf['status'] = 200
if endpoint == '/v1alpha1/graphql':
self.conf['status'] = 400
else:
self.conf['status'] = 200
self.conf['url'] = endpoint
check_query(hge_ctx, self.conf, add_auth=False)
def test_jwt_claims_map_claim_not_found(self, hge_ctx, jwt_configuration, endpoint):
default_user_id = jwt_configuration.server_configuration['claims_map']['x-hasura-user-id'].get('default')
self.mk_claims(None, ['user', 'editor'], 'user')
token = jwt.encode(self.claims, jwt_configuration.private_key, algorithm=jwt_configuration.algorithm)
self.conf['headers']['Authorization'] = 'Bearer ' + token
if default_user_id is None:
self.conf['response'] = {
'errors': [{
'extensions': {
'code': 'jwt-invalid-claims',
'path': '$'
},
'message': 'JWT claim from claims_map, x-hasura-user-id not found'
}]
}
if endpoint == '/v1/graphql':
self.conf['status'] = 200
if endpoint == '/v1alpha1/graphql':
self.conf['status'] = 400
else:
self.conf['status'] = 200
self.conf['url'] = endpoint
check_query(hge_ctx, self.conf, add_auth=False)
@pytest.fixture(autouse=True)
def transact(self, setup):
self.dir = 'queries/graphql_query/permissions'
with open(self.dir + '/user_select_query_unpublished_articles.yaml') as c:
self.conf = yaml.load(c)
curr_time = datetime.now()
exp_time = curr_time + timedelta(hours=1)
self.claims = {
'sub': '1234567890',
'name': 'John Doe',
'iat': math.floor(curr_time.timestamp()),
'exp': math.floor(exp_time.timestamp())
}
@pytest.fixture(scope='class')
def setup(self, postgis, hge_ctx):
self.dir = 'queries/graphql_query/permissions'
hge_ctx.v1q_f(self.dir + '/setup.yaml')
yield
hge_ctx.v1q_f(self.dir + '/teardown.yaml')
@pytest.mark.jwt('rsa', { 'claims_map': basic_claims_map })
class TestJWTClaimsMapBasicWithRSA(AbstractTestJWTClaimsMapBasic):
pass
@pytest.mark.jwt('ed25519', { 'claims_map': basic_claims_map })
class TestJWTClaimsMapBasicWithEd25519(AbstractTestJWTClaimsMapBasic):
pass
@pytest.mark.jwt('rsa', { 'claims_map': basic_claims_map_with_default_values })
class TestJWTClaimsMapBasicWithRSAAndDefaultValues(AbstractTestJWTClaimsMapBasic):
pass
@pytest.mark.jwt('ed25519', { 'claims_map': basic_claims_map_with_default_values })
class TestJWTClaimsMapBasicWithEd25519AndDefaultValues(AbstractTestJWTClaimsMapBasic):
pass
# The values of 'x-hasura-allowed-roles' and 'x-hasura-default-role' has
# been set in the JWT config
@pytest.mark.admin_secret
@pytest.mark.parametrize('endpoint', ['/v1/graphql', '/v1alpha1/graphql'])
class AbstractTestJWTClaimsMapWithStaticHasuraClaimsMapValues:
def mk_claims(self, user_id=None):
self.claims['https://myapp.com/jwt/claims'] = clean_null_terms({
'user': {
'id': user_id
}
})
def test_jwt_claims_map_valid_claims_success(self, hge_ctx, jwt_configuration, endpoint):
self.mk_claims('1')
token = jwt.encode(self.claims, jwt_configuration.private_key, algorithm=jwt_configuration.algorithm)
self.conf['headers']['Authorization'] = 'Bearer ' + token
self.conf['headers']['x-hasura-custom-header'] = 'custom-value'
self.conf['url'] = endpoint
self.conf['status'] = 200
check_query(hge_ctx, self.conf, add_auth=False)
def test_jwt_claims_map_invalid_role_in_request_header(self, hge_ctx, jwt_configuration, endpoint):
self.mk_claims('1')
token = jwt.encode(self.claims, jwt_configuration.private_key, algorithm=jwt_configuration.algorithm)
self.conf['headers']['Authorization'] = 'Bearer ' + token
self.conf['headers']['X-Hasura-Role'] = 'random_string'
self.conf['response'] = {
'errors': [{
'extensions': {
'code': 'access-denied',
'path': '$'
},
'message': 'Your requested role is not in allowed roles'
}]
}
self.conf['url'] = endpoint
if endpoint == '/v1/graphql':
self.conf['status'] = 200
if endpoint == '/v1alpha1/graphql':
self.conf['status'] = 400
check_query(hge_ctx, self.conf, add_auth=False)
def test_jwt_claims_map_claim_not_found(self, hge_ctx, jwt_configuration, endpoint):
self.mk_claims(None)
token = jwt.encode(self.claims, jwt_configuration.private_key, algorithm=jwt_configuration.algorithm)
self.conf['headers']['Authorization'] = 'Bearer ' + token
self.conf['response'] = {
'errors': [{
'extensions': {
'code': 'jwt-invalid-claims',
'path': '$'
},
'message': 'JWT claim from claims_map, x-hasura-user-id not found'
}]
}
self.conf['url'] = endpoint
if endpoint == '/v1/graphql':
self.conf['status'] = 200
if endpoint == '/v1alpha1/graphql':
self.conf['status'] = 400
check_query(hge_ctx, self.conf, add_auth=False)
@pytest.fixture(autouse=True)
def transact(self, setup):
self.dir = 'queries/graphql_query/permissions'
with open(self.dir + '/user_select_query_unpublished_articles.yaml') as c:
self.conf = yaml.load(c)
curr_time = datetime.now()
exp_time = curr_time + timedelta(hours=1)
self.claims = {
'sub': '1234567890',
'name': 'John Doe',
'iat': math.floor(curr_time.timestamp()),
'exp': math.floor(exp_time.timestamp())
}
@pytest.fixture(scope='class')
def setup(self, postgis, hge_ctx):
self.dir = 'queries/graphql_query/permissions'
hge_ctx.v1q_f(self.dir + '/setup.yaml')
yield
hge_ctx.v1q_f(self.dir + '/teardown.yaml')
@pytest.mark.jwt('rsa', { 'claims_map': static_claims_map })
class TestJWTClaimsMapWithStaticHasuraClaimsMapValuesWithRSA(AbstractTestJWTClaimsMapWithStaticHasuraClaimsMapValues):
pass
@pytest.mark.jwt('ed25519', { 'claims_map': static_claims_map })
class TestJWTClaimsMapWithStaticHasuraClaimsMapValuesWithEd25519(AbstractTestJWTClaimsMapWithStaticHasuraClaimsMapValues):
pass