test jwt with invalid signtaure and expired token (#1492)

This commit is contained in:
Anon Ray 2019-01-28 16:52:43 +00:00 committed by Shahidh K Muhammed
parent 4ae44f7b5d
commit 91c19ecf3d

View File

@ -1,22 +1,24 @@
from datetime import datetime, timedelta
import math
import yaml
import pytest
import jwt
from validate import check_query
if not pytest.config.getoption("--hge-jwt-key-file"):
pytest.skip("--hge-jwt-key-file is missing, skipping JWT basic tests", allow_module_level=True)
if not pytest.config.getoption('--hge-jwt-key-file'):
pytest.skip('--hge-jwt-key-file is missing, skipping JWT basic tests', allow_module_level=True)
class TestJWTBasic:
def test_jwt_invalid_role_in_request_header(self, hge_ctx):
self.claims['https://hasura.io/jwt/claims'] = {
"x-hasura-user-id": "1",
"x-hasura-allowed-roles": ["contractor", "editor"],
"x-hasura-default-role": "contractor"
'x-hasura-user-id': '1',
'x-hasura-allowed-roles': ['contractor', 'editor'],
'x-hasura-default-role': 'contractor'
}
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512' ).decode('UTF-8')
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8')
self.conf['headers']['Authorization'] = 'Bearer ' + token
self.conf['response'] = {
'errors': [{
@ -32,10 +34,10 @@ class TestJWTBasic:
def test_jwt_no_allowed_roles_in_claim(self, hge_ctx):
self.claims['https://hasura.io/jwt/claims'] = {
"x-hasura-user-id": "1",
"x-hasura-default-role": "user"
'x-hasura-user-id': '1',
'x-hasura-default-role': 'user'
}
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512' ).decode('UTF-8')
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8')
self.conf['headers']['Authorization'] = 'Bearer ' + token
self.conf['response'] = {
'errors': [{
@ -51,11 +53,11 @@ class TestJWTBasic:
def test_jwt_invalid_allowed_roles_in_claim(self, hge_ctx):
self.claims['https://hasura.io/jwt/claims'] = {
"x-hasura-user-id": "1",
"x-hasura-allowed-roles": "user",
"x-hasura-default-role": "user"
'x-hasura-user-id': '1',
'x-hasura-allowed-roles': 'user',
'x-hasura-default-role': 'user'
}
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512' ).decode('UTF-8')
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8')
self.conf['headers']['Authorization'] = 'Bearer ' + token
self.conf['response'] = {
'errors': [{
@ -71,10 +73,10 @@ class TestJWTBasic:
def test_jwt_no_default_role(self, hge_ctx):
self.claims['https://hasura.io/jwt/claims'] = {
"x-hasura-user-id": "1",
"x-hasura-allowed-roles": ["user"],
'x-hasura-user-id': '1',
'x-hasura-allowed-roles': ['user'],
}
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512' ).decode('UTF-8')
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8')
self.conf['headers']['Authorization'] = 'Bearer ' + token
self.conf['response'] = {
'errors': [{
@ -88,6 +90,49 @@ class TestJWTBasic:
self.conf['status'] = 400
check_query(hge_ctx, self.conf, add_auth=False)
def test_jwt_expired(self, hge_ctx):
self.claims['https://hasura.io/jwt/claims'] = {
'x-hasura-default-role': 'user',
'x-hasura-allowed-roles': ['user'],
}
exp = datetime.now() - timedelta(minutes=1)
self.claims['exp'] = round(exp.timestamp())
token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8')
self.conf['headers']['Authorization'] = 'Bearer ' + token
self.conf['response'] = {
'errors': [{
'extensions': {
'code': 'invalid-jwt',
'path': '$'
},
'message': 'Could not verify JWT: JWTExpired'
}]
}
self.conf['status'] = 400
check_query(hge_ctx, self.conf, add_auth=False)
def test_jwt_invalid_signature(self, hge_ctx):
self.claims['https://hasura.io/jwt/claims'] = {
'x-hasura-default-role': 'user',
'x-hasura-allowed-roles': ['user'],
}
wrong_key = gen_rsa_key()
token = jwt.encode(self.claims, wrong_key, algorithm='HS256').decode('utf-8')
self.conf['headers']['Authorization'] = 'Bearer ' + token
self.conf['response'] = {
'errors': [{
'extensions': {
'code': 'invalid-jwt',
'path': '$'
},
'message': 'Could not verify JWT: JWSError JWSInvalidSignature'
}]
}
self.conf['status'] = 400
check_query(hge_ctx, self.conf, add_auth=False)
@pytest.fixture(autouse=True)
def transact(self, request, hge_ctx):
self.dir = 'queries/graphql_query/permissions'
@ -95,11 +140,31 @@ class TestJWTBasic:
assert st_code == 200, resp
with open(self.dir + '/user_select_query_unpublished_articles.yaml') as c:
self.conf = yaml.safe_load(c)
curr_time = datetime.now()
exp_time = curr_time + timedelta(hours=1)
self.claims = {
"sub": "1234567890",
"name": "John Doe",
"iat": 1516239022
'sub': '1234567890',
'name': 'John Doe',
'iat': math.floor(curr_time.timestamp()),
'exp': math.floor(exp_time.timestamp())
}
yield
st_code, resp = hge_ctx.v1q_f(self.dir + '/teardown.yaml')
assert st_code == 200, resp
def gen_rsa_key():
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
)
return pem