From 91c19ecf3d113f501979a86c561eefca2c61cc4d Mon Sep 17 00:00:00 2001 From: Anon Ray Date: Mon, 28 Jan 2019 16:52:43 +0000 Subject: [PATCH] test jwt with invalid signtaure and expired token (#1492) --- server/tests-py/test_jwt.py | 103 +++++++++++++++++++++++++++++------- 1 file changed, 84 insertions(+), 19 deletions(-) diff --git a/server/tests-py/test_jwt.py b/server/tests-py/test_jwt.py index b0afacff497..66358b4e778 100644 --- a/server/tests-py/test_jwt.py +++ b/server/tests-py/test_jwt.py @@ -1,22 +1,24 @@ +from datetime import datetime, timedelta +import math import yaml import pytest import jwt from validate import check_query -if not pytest.config.getoption("--hge-jwt-key-file"): - pytest.skip("--hge-jwt-key-file is missing, skipping JWT basic tests", allow_module_level=True) +if not pytest.config.getoption('--hge-jwt-key-file'): + pytest.skip('--hge-jwt-key-file is missing, skipping JWT basic tests', allow_module_level=True) class TestJWTBasic: def test_jwt_invalid_role_in_request_header(self, hge_ctx): self.claims['https://hasura.io/jwt/claims'] = { - "x-hasura-user-id": "1", - "x-hasura-allowed-roles": ["contractor", "editor"], - "x-hasura-default-role": "contractor" + 'x-hasura-user-id': '1', + 'x-hasura-allowed-roles': ['contractor', 'editor'], + 'x-hasura-default-role': 'contractor' } - token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512' ).decode('UTF-8') + token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8') self.conf['headers']['Authorization'] = 'Bearer ' + token self.conf['response'] = { 'errors': [{ @@ -32,10 +34,10 @@ class TestJWTBasic: def test_jwt_no_allowed_roles_in_claim(self, hge_ctx): self.claims['https://hasura.io/jwt/claims'] = { - "x-hasura-user-id": "1", - "x-hasura-default-role": "user" + 'x-hasura-user-id': '1', + 'x-hasura-default-role': 'user' } - token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512' ).decode('UTF-8') + token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8') self.conf['headers']['Authorization'] = 'Bearer ' + token self.conf['response'] = { 'errors': [{ @@ -51,11 +53,11 @@ class TestJWTBasic: def test_jwt_invalid_allowed_roles_in_claim(self, hge_ctx): self.claims['https://hasura.io/jwt/claims'] = { - "x-hasura-user-id": "1", - "x-hasura-allowed-roles": "user", - "x-hasura-default-role": "user" + 'x-hasura-user-id': '1', + 'x-hasura-allowed-roles': 'user', + 'x-hasura-default-role': 'user' } - token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512' ).decode('UTF-8') + token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8') self.conf['headers']['Authorization'] = 'Bearer ' + token self.conf['response'] = { 'errors': [{ @@ -71,10 +73,10 @@ class TestJWTBasic: def test_jwt_no_default_role(self, hge_ctx): self.claims['https://hasura.io/jwt/claims'] = { - "x-hasura-user-id": "1", - "x-hasura-allowed-roles": ["user"], + 'x-hasura-user-id': '1', + 'x-hasura-allowed-roles': ['user'], } - token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512' ).decode('UTF-8') + token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8') self.conf['headers']['Authorization'] = 'Bearer ' + token self.conf['response'] = { 'errors': [{ @@ -88,6 +90,49 @@ class TestJWTBasic: self.conf['status'] = 400 check_query(hge_ctx, self.conf, add_auth=False) + def test_jwt_expired(self, hge_ctx): + self.claims['https://hasura.io/jwt/claims'] = { + 'x-hasura-default-role': 'user', + 'x-hasura-allowed-roles': ['user'], + } + exp = datetime.now() - timedelta(minutes=1) + self.claims['exp'] = round(exp.timestamp()) + + token = jwt.encode(self.claims, hge_ctx.hge_jwt_key, algorithm='RS512').decode('utf-8') + self.conf['headers']['Authorization'] = 'Bearer ' + token + self.conf['response'] = { + 'errors': [{ + 'extensions': { + 'code': 'invalid-jwt', + 'path': '$' + }, + 'message': 'Could not verify JWT: JWTExpired' + }] + } + self.conf['status'] = 400 + check_query(hge_ctx, self.conf, add_auth=False) + + def test_jwt_invalid_signature(self, hge_ctx): + self.claims['https://hasura.io/jwt/claims'] = { + 'x-hasura-default-role': 'user', + 'x-hasura-allowed-roles': ['user'], + } + + wrong_key = gen_rsa_key() + token = jwt.encode(self.claims, wrong_key, algorithm='HS256').decode('utf-8') + self.conf['headers']['Authorization'] = 'Bearer ' + token + self.conf['response'] = { + 'errors': [{ + 'extensions': { + 'code': 'invalid-jwt', + 'path': '$' + }, + 'message': 'Could not verify JWT: JWSError JWSInvalidSignature' + }] + } + self.conf['status'] = 400 + check_query(hge_ctx, self.conf, add_auth=False) + @pytest.fixture(autouse=True) def transact(self, request, hge_ctx): self.dir = 'queries/graphql_query/permissions' @@ -95,11 +140,31 @@ class TestJWTBasic: assert st_code == 200, resp with open(self.dir + '/user_select_query_unpublished_articles.yaml') as c: self.conf = yaml.safe_load(c) + curr_time = datetime.now() + exp_time = curr_time + timedelta(hours=1) self.claims = { - "sub": "1234567890", - "name": "John Doe", - "iat": 1516239022 + 'sub': '1234567890', + 'name': 'John Doe', + 'iat': math.floor(curr_time.timestamp()), + 'exp': math.floor(exp_time.timestamp()) } yield st_code, resp = hge_ctx.v1q_f(self.dir + '/teardown.yaml') assert st_code == 200, resp + + +def gen_rsa_key(): + from cryptography.hazmat.backends import default_backend + from cryptography.hazmat.primitives.asymmetric import rsa + from cryptography.hazmat.primitives import serialization + private_key = rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + backend=default_backend() + ) + pem = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption() + ) + return pem