2018-09-18 09:21:57 +03:00
#!/usr/bin/env python3
2022-04-22 22:53:12 +03:00
import datetime
2021-08-24 19:25:12 +03:00
import time
2018-09-18 09:21:57 +03:00
import pytest
import json
import queue
2021-07-31 00:41:52 +03:00
from validate import check_query_f
2021-10-29 17:42:07 +03:00
from collections import OrderedDict
2022-04-22 22:53:12 +03:00
from utils import insert_many
2022-01-17 10:39:59 +03:00
from ruamel . yaml import YAML
2018-09-20 04:46:03 +03:00
2020-02-13 12:14:02 +03:00
usefixtures = pytest . mark . usefixtures
2022-01-17 10:39:59 +03:00
yaml = YAML ( typ = ' safe ' , pure = True )
2020-02-13 12:14:02 +03:00
@pytest.fixture ( scope = ' class ' )
def ws_conn_init ( hge_ctx , ws_client ) :
2021-08-24 19:25:12 +03:00
init_ws_conn ( hge_ctx , ws_client )
@pytest.fixture ( scope = ' class ' )
def ws_conn_init_graphql_ws ( hge_ctx , ws_client_graphql_ws ) :
init_graphql_ws_conn ( hge_ctx , ws_client_graphql_ws )
2019-09-30 22:50:57 +03:00
2018-09-20 04:46:03 +03:00
'''
Refer : https : / / github . com / apollographql / subscriptions - transport - ws / blob / master / PROTOCOL . md #gql_connection_init
'''
2018-10-30 12:21:58 +03:00
2019-05-14 09:24:46 +03:00
def init_ws_conn ( hge_ctx , ws_client , payload = None ) :
if payload is None :
payload = { }
if hge_ctx . hge_key is not None :
payload = {
' headers ' : {
' X-Hasura-Admin-Secret ' : hge_ctx . hge_key
}
2018-10-28 21:27:49 +03:00
}
2019-05-14 09:24:46 +03:00
2019-04-08 10:22:38 +03:00
init_msg = {
2018-09-20 04:46:03 +03:00
' type ' : ' connection_init ' ,
2018-10-28 21:27:49 +03:00
' payload ' : payload ,
2018-09-20 04:46:03 +03:00
}
2019-04-08 10:22:38 +03:00
ws_client . send ( init_msg )
ev = ws_client . get_ws_event ( 3 )
2018-09-20 04:46:03 +03:00
assert ev [ ' type ' ] == ' connection_ack ' , ev
2018-09-18 09:21:57 +03:00
2021-08-24 19:25:12 +03:00
def init_graphql_ws_conn ( hge_ctx , ws_client_graphql_ws , payload = None ) :
if payload is None :
payload = { }
if hge_ctx . hge_key is not None :
payload = {
' headers ' : {
' X-Hasura-Admin-Secret ' : hge_ctx . hge_key
}
}
init_msg = {
' type ' : ' connection_init ' ,
' payload ' : payload ,
}
ws_client_graphql_ws . send ( init_msg )
ev = ws_client_graphql_ws . get_ws_event ( 3 )
assert ev [ ' type ' ] == ' connection_ack ' , ev
2019-04-08 10:22:38 +03:00
class TestSubscriptionCtrl ( object ) :
def test_init_without_payload ( self , hge_ctx , ws_client ) :
if hge_ctx . hge_key is not None :
pytest . skip ( " Payload is needed when admin secret is set " )
init_msg = {
' type ' : ' connection_init '
}
ws_client . send ( init_msg )
ev = ws_client . get_ws_event ( 15 )
assert ev [ ' type ' ] == ' connection_ack ' , ev
'''
Refer : https : / / github . com / apollographql / subscriptions - transport - ws / blob / master / PROTOCOL . md #gql_connection_init
'''
def test_init ( self , hge_ctx , ws_client ) :
init_ws_conn ( hge_ctx , ws_client )
'''
Refer : https : / / github . com / apollographql / subscriptions - transport - ws / blob / master / PROTOCOL . md #gql_connection_terminate
'''
def test_connection_terminate ( self , hge_ctx , ws_client ) :
obj = {
' type ' : ' connection_terminate '
}
ws_client . send ( obj )
with pytest . raises ( queue . Empty ) :
ev = ws_client . get_ws_event ( 3 )
2018-10-30 12:21:58 +03:00
2021-05-10 13:17:54 +03:00
@pytest.mark.parametrize ( " backend " , [ ' mssql ' , ' postgres ' ] )
2021-05-25 16:54:18 +03:00
@usefixtures ( ' per_class_tests_db_state ' , ' ws_conn_init ' )
2020-02-13 12:14:02 +03:00
class TestSubscriptionBasic :
2019-09-30 22:50:57 +03:00
@classmethod
def dir ( cls ) :
return ' queries/subscriptions/basic '
2019-04-08 10:22:38 +03:00
2021-07-31 00:41:52 +03:00
@pytest.mark.parametrize ( " transport " , [ ' http ' , ' websocket ' , ' subscription ' ] )
def test_negative ( self , hge_ctx , transport ) :
check_query_f ( hge_ctx , self . dir ( ) + ' /negative_test.yaml ' , transport )
2018-09-18 09:21:57 +03:00
'''
Refer : https : / / github . com / apollographql / subscriptions - transport - ws / blob / master / PROTOCOL . md #gql_connection_error
'''
2018-10-30 12:21:58 +03:00
2019-04-08 10:22:38 +03:00
def test_connection_error ( self , ws_client ) :
ws_client . send ( { ' type ' : ' test ' } )
ev = ws_client . get_ws_event ( 15 )
2018-09-18 09:21:57 +03:00
assert ev [ ' type ' ] == ' connection_error ' , ev
'''
Refer : https : / / github . com / apollographql / subscriptions - transport - ws / blob / master / PROTOCOL . md #gql_start
'''
2018-10-30 12:21:58 +03:00
2019-04-08 10:22:38 +03:00
def test_start ( self , ws_client ) :
2018-09-18 09:21:57 +03:00
query = """
subscription {
2018-10-26 14:57:33 +03:00
hge_tests_test_t1 ( order_by : { c1 : desc } , limit : 1 ) {
2018-09-18 09:21:57 +03:00
c1 ,
c2
}
}
"""
obj = {
' id ' : ' 1 ' ,
' payload ' : {
' query ' : query
} ,
' type ' : ' start '
}
2019-04-08 10:22:38 +03:00
ws_client . send ( obj )
2018-09-18 09:21:57 +03:00
'''
Refer : https : / / github . com / apollographql / subscriptions - transport - ws / blob / master / PROTOCOL . md #gql_data
'''
2019-04-08 10:22:38 +03:00
ev = ws_client . get_ws_query_event ( ' 1 ' , 15 )
2018-09-18 09:21:57 +03:00
assert ev [ ' type ' ] == ' data ' and ev [ ' id ' ] == ' 1 ' , ev
'''
Refer https : / / github . com / apollographql / subscriptions - transport - ws / blob / 01e0 b2b65df07c52f5831cce5c858966ba095993 / src / server . ts #L306
'''
@pytest.mark.skip ( reason = " refer https://github.com/hasura/graphql-engine/pull/387#issuecomment-421343098 " )
2019-04-08 10:22:38 +03:00
def test_start_duplicate ( self , ws_client ) :
self . test_start ( ws_client )
2018-09-18 09:21:57 +03:00
2019-04-08 10:22:38 +03:00
def test_stop_without_id ( self , ws_client ) :
2018-09-18 09:21:57 +03:00
obj = {
' type ' : ' stop '
}
2019-04-08 10:22:38 +03:00
ws_client . send ( obj )
ev = ws_client . get_ws_event ( 3 )
2018-09-18 09:21:57 +03:00
assert ev [ ' type ' ] == ' connection_error ' , ev
'''
Refer https : / / github . com / apollographql / subscriptions - transport - ws / blob / master / PROTOCOL . md #gql_stop
'''
2018-10-30 12:21:58 +03:00
2019-04-08 10:22:38 +03:00
def test_stop ( self , ws_client ) :
2018-09-18 09:21:57 +03:00
obj = {
' type ' : ' stop ' ,
' id ' : ' 1 '
}
2019-04-08 10:22:38 +03:00
ws_client . send ( obj )
2018-09-20 04:46:03 +03:00
with pytest . raises ( queue . Empty ) :
2019-04-08 10:22:38 +03:00
ev = ws_client . get_ws_event ( 3 )
2018-09-18 09:21:57 +03:00
2019-04-08 10:22:38 +03:00
def test_start_after_stop ( self , ws_client ) :
self . test_start ( ws_client )
self . test_stop ( ws_client )
2018-09-18 09:21:57 +03:00
'''
Refer : https : / / github . com / apollographql / subscriptions - transport - ws / blob / master / PROTOCOL . md #gql_complete
'''
2018-10-30 12:21:58 +03:00
2019-04-08 10:22:38 +03:00
def test_complete ( self , hge_ctx , ws_client ) :
2018-09-18 09:21:57 +03:00
query = """
query {
2018-10-26 14:57:33 +03:00
hge_tests_test_t1 ( order_by : { c1 : desc } , limit : 1 ) {
2018-09-18 09:21:57 +03:00
c1 ,
c2
}
}
"""
obj = {
' id ' : ' 2 ' ,
' payload ' : {
' query ' : query
} ,
' type ' : ' start '
}
2019-04-08 10:22:38 +03:00
ws_client . send ( obj )
ev = ws_client . get_ws_query_event ( ' 2 ' , 3 )
2018-09-18 09:21:57 +03:00
assert ev [ ' type ' ] == ' data ' and ev [ ' id ' ] == ' 2 ' , ev
# Check for complete type
2019-04-08 10:22:38 +03:00
ev = ws_client . get_ws_query_event ( ' 2 ' , 3 )
2018-09-18 09:21:57 +03:00
assert ev [ ' type ' ] == ' complete ' and ev [ ' id ' ] == ' 2 ' , ev
2022-03-21 15:14:52 +03:00
## NOTE: The same tests as in TestSubscriptionBasic but with
2021-08-24 19:25:12 +03:00
## the subscription transport being used is `graphql-ws`
## FIXME: There's an issue with the tests being parametrized with both
## postgres and mssql data sources enabled(See issue #2084).
@usefixtures ( ' per_method_tests_db_state ' , ' ws_conn_init_graphql_ws ' )
class TestSubscriptionBasicGraphQLWS :
@classmethod
def dir ( cls ) :
return ' queries/subscriptions/basic '
@pytest.mark.parametrize ( " transport " , [ ' http ' , ' websocket ' , ' subscription ' ] )
def test_negative ( self , hge_ctx , transport ) :
check_query_f ( hge_ctx , self . dir ( ) + ' /negative_test.yaml ' , transport , gqlws = True )
def test_connection_error ( self , hge_ctx , ws_client_graphql_ws ) :
if ws_client_graphql_ws . get_conn_close_state ( ) :
ws_client_graphql_ws . create_conn ( )
if hge_ctx . hge_key == None :
ws_client_graphql_ws . init ( )
else :
ws_client_graphql_ws . init_as_admin ( )
ws_client_graphql_ws . send ( { ' type ' : ' test ' } )
time . sleep ( 2 )
ev = ws_client_graphql_ws . get_conn_close_state ( )
assert ev == True , ev
def test_start ( self , hge_ctx , ws_client_graphql_ws ) :
if ws_client_graphql_ws . get_conn_close_state ( ) :
ws_client_graphql_ws . create_conn ( )
if hge_ctx . hge_key == None :
ws_client_graphql_ws . init ( )
else :
ws_client_graphql_ws . init_as_admin ( )
query = """
subscription {
hge_tests_test_t1 ( order_by : { c1 : desc } , limit : 1 ) {
c1 ,
c2
}
}
"""
obj = {
' id ' : ' 1 ' ,
' payload ' : {
' query ' : query
} ,
' type ' : ' subscribe '
}
ws_client_graphql_ws . send ( obj )
ev = ws_client_graphql_ws . get_ws_query_event ( ' 1 ' , 15 )
assert ev [ ' type ' ] == ' next ' and ev [ ' id ' ] == ' 1 ' , ev
@pytest.mark.skip ( reason = " refer https://github.com/hasura/graphql-engine/pull/387#issuecomment-421343098 " )
def test_start_duplicate ( self , hge_ctx , ws_client_graphql_ws ) :
if ws_client_graphql_ws . get_conn_close_state ( ) :
ws_client_graphql_ws . create_conn ( )
if hge_ctx . hge_key == None :
ws_client_graphql_ws . init ( )
else :
ws_client_graphql_ws . init_as_admin ( )
self . test_start ( ws_client_graphql_ws )
def test_stop_without_id ( self , hge_ctx , ws_client_graphql_ws ) :
if ws_client_graphql_ws . get_conn_close_state ( ) :
ws_client_graphql_ws . create_conn ( )
if hge_ctx . hge_key == None :
ws_client_graphql_ws . init ( )
else :
ws_client_graphql_ws . init_as_admin ( )
obj = {
' type ' : ' complete '
}
ws_client_graphql_ws . send ( obj )
time . sleep ( 2 )
ev = ws_client_graphql_ws . get_conn_close_state ( )
assert ev == True , ev
def test_stop ( self , hge_ctx , ws_client_graphql_ws ) :
if ws_client_graphql_ws . get_conn_close_state ( ) :
ws_client_graphql_ws . create_conn ( )
if hge_ctx . hge_key == None :
ws_client_graphql_ws . init ( )
else :
ws_client_graphql_ws . init_as_admin ( )
obj = {
' type ' : ' complete ' ,
' id ' : ' 1 '
}
ws_client_graphql_ws . send ( obj )
time . sleep ( 2 )
with pytest . raises ( queue . Empty ) :
ev = ws_client_graphql_ws . get_ws_event ( 3 )
def test_start_after_stop ( self , hge_ctx , ws_client_graphql_ws ) :
if ws_client_graphql_ws . get_conn_close_state ( ) :
ws_client_graphql_ws . create_conn ( )
if hge_ctx . hge_key == None :
ws_client_graphql_ws . init ( )
else :
ws_client_graphql_ws . init_as_admin ( )
self . test_start ( hge_ctx , ws_client_graphql_ws )
## NOTE: test_start leaves a message in the queue, hence clearing it
if len ( ws_client_graphql_ws . get_queue ( ) ) > 0 :
ws_client_graphql_ws . clear_queue ( )
self . test_stop ( hge_ctx , ws_client_graphql_ws )
def test_complete ( self , hge_ctx , ws_client_graphql_ws ) :
if ws_client_graphql_ws . get_conn_close_state ( ) :
ws_client_graphql_ws . create_conn ( )
if hge_ctx . hge_key == None :
ws_client_graphql_ws . init ( )
else :
ws_client_graphql_ws . init_as_admin ( )
query = """
query {
hge_tests_test_t1 ( order_by : { c1 : desc } , limit : 1 ) {
c1 ,
c2
}
}
"""
obj = {
' id ' : ' 2 ' ,
' payload ' : {
' query ' : query
} ,
' type ' : ' subscribe '
}
ws_client_graphql_ws . send ( obj )
ev = ws_client_graphql_ws . get_ws_query_event ( ' 2 ' , 3 )
assert ev [ ' type ' ] == ' next ' and ev [ ' id ' ] == ' 2 ' , ev
# Check for complete type
ev = ws_client_graphql_ws . get_ws_query_event ( ' 2 ' , 3 )
assert ev [ ' type ' ] == ' complete ' and ev [ ' id ' ] == ' 2 ' , ev
2018-09-20 04:46:03 +03:00
2020-02-13 12:14:02 +03:00
@usefixtures ( ' per_method_tests_db_state ' , ' ws_conn_init ' )
class TestSubscriptionLiveQueries :
2019-09-30 22:50:57 +03:00
@classmethod
def dir ( cls ) :
return ' queries/subscriptions/live_queries '
2018-09-20 04:46:03 +03:00
2019-04-08 10:22:38 +03:00
def test_live_queries ( self , hge_ctx , ws_client ) :
2018-09-20 04:46:03 +03:00
'''
Create connection using connection_init
'''
2019-04-08 10:22:38 +03:00
ws_client . init_as_admin ( )
2018-09-20 04:46:03 +03:00
2019-01-25 06:31:54 +03:00
with open ( self . dir ( ) + " /steps.yaml " ) as c :
2022-01-17 10:39:59 +03:00
conf = yaml . load ( c )
2018-09-20 04:46:03 +03:00
2019-04-08 10:22:38 +03:00
queryTmplt = """
2019-09-14 09:01:06 +03:00
subscription ( $ result_limit : Int ! ) {
hge_tests_live_query_ { 0 } : hge_tests_test_t2 ( order_by : { c1 : asc } , limit : $ result_limit ) {
2018-09-20 04:46:03 +03:00
c1 ,
c2
}
}
"""
2019-04-08 10:22:38 +03:00
2019-09-14 09:01:06 +03:00
queries = [ ( 0 , 1 ) , ( 1 , 2 ) , ( 2 , 2 ) ]
2019-04-08 10:22:38 +03:00
liveQs = [ ]
2019-09-14 09:01:06 +03:00
for i , resultLimit in queries :
2019-04-08 10:22:38 +03:00
query = queryTmplt . replace ( ' {0} ' , str ( i ) )
headers = { }
if hge_ctx . hge_key is not None :
headers [ ' X-Hasura-Admin-Secret ' ] = hge_ctx . hge_key
2019-09-14 09:01:06 +03:00
subscrPayload = { ' query ' : query , ' variables ' : { ' result_limit ' : resultLimit } }
2019-04-08 10:22:38 +03:00
respLive = ws_client . send_query ( subscrPayload , query_id = ' live_ ' + str ( i ) , headers = headers , timeout = 15 )
liveQs . append ( respLive )
ev = next ( respLive )
assert ev [ ' type ' ] == ' data ' , ev
assert ev [ ' id ' ] == ' live_ ' + str ( i ) , ev
assert ev [ ' payload ' ] [ ' data ' ] == { ' hge_tests_live_query_ ' + str ( i ) : [ ] } , ev [ ' payload ' ] [ ' data ' ]
2018-09-20 04:46:03 +03:00
assert isinstance ( conf , list ) == True , ' Not an list '
for index , step in enumerate ( conf ) :
2019-04-08 10:22:38 +03:00
mutationPayload = { ' query ' : step [ ' query ' ] }
2018-09-20 04:46:03 +03:00
if ' variables ' in step and step [ ' variables ' ] :
2019-04-08 10:22:38 +03:00
mutationPayload [ ' variables ' ] = json . loads ( step [ ' variables ' ] )
2018-09-20 04:46:03 +03:00
expected_resp = json . loads ( step [ ' response ' ] )
2019-04-08 10:22:38 +03:00
mutResp = ws_client . send_query ( mutationPayload , ' mutation_ ' + str ( index ) , timeout = 15 )
ev = next ( mutResp )
assert ev [ ' type ' ] == ' data ' and ev [ ' id ' ] == ' mutation_ ' + str ( index ) , ev
2018-09-20 04:46:03 +03:00
assert ev [ ' payload ' ] [ ' data ' ] == expected_resp , ev [ ' payload ' ] [ ' data ' ]
2019-04-08 10:22:38 +03:00
ev = next ( mutResp )
assert ev [ ' type ' ] == ' complete ' and ev [ ' id ' ] == ' mutation_ ' + str ( index ) , ev
2019-09-14 09:01:06 +03:00
for ( i , resultLimit ) , respLive in zip ( queries , liveQs ) :
2019-04-08 10:22:38 +03:00
ev = next ( respLive )
assert ev [ ' type ' ] == ' data ' , ev
assert ev [ ' id ' ] == ' live_ ' + str ( i ) , ev
2019-09-14 09:01:06 +03:00
expectedReturnedResponse = [ ]
if ' live_response ' in step :
expectedReturnedResponse = json . loads ( step [ ' live_response ' ] )
elif ' returning ' in expected_resp [ step [ ' name ' ] ] :
expectedReturnedResponse = expected_resp [ step [ ' name ' ] ] [ ' returning ' ]
expectedLimitedResponse = expectedReturnedResponse [ : resultLimit ]
expectedLiveResponse = { ' hge_tests_live_query_ ' + str ( i ) : expectedLimitedResponse }
assert ev [ ' payload ' ] [ ' data ' ] == expectedLiveResponse , ev [ ' payload ' ] [ ' data ' ]
for i , _ in queries :
2019-04-08 10:22:38 +03:00
# stop live operation
frame = {
' id ' : ' live_ ' + str ( i ) ,
' type ' : ' stop '
}
ws_client . send ( frame )
2018-09-20 04:46:03 +03:00
2018-09-18 09:21:57 +03:00
with pytest . raises ( queue . Empty ) :
2019-04-08 10:22:38 +03:00
ev = ws_client . get_ws_event ( 3 )
2018-09-20 04:46:03 +03:00
2022-04-22 22:53:12 +03:00
@usefixtures ( ' per_method_tests_db_state ' , ' ws_conn_init ' , ' streaming_subscriptions_fixtures ' )
class TestStreamingSubscription :
@classmethod
def dir ( cls ) :
return ' queries/subscriptions/streaming '
def test_basic_streaming_subscription_existing_static_data ( self , hge_ctx , ws_client ) :
'''
Create connection using connection_init
'''
ws_client . init_as_admin ( )
query = """
subscription ( $ batch_size : Int ! ) {
hge_tests_stream_query : hge_tests_articles_stream ( cursor : { initial_value : { id : 0 } } , batch_size : $ batch_size ) {
id
title
}
}
"""
liveQs = [ ]
headers = { }
articles_to_insert = [ ]
for i in range ( 10 ) :
articles_to_insert . append ( { " id " : i + 1 , " title " : " Article title {} " . format ( i + 1 ) } )
st_code , resp = insert_many ( hge_ctx , { " schema " : " hge_tests " , " name " : " articles " } , articles_to_insert )
assert st_code == 200 , resp
if hge_ctx . hge_key is not None :
headers [ ' X-Hasura-Admin-Secret ' ] = hge_ctx . hge_key
subscrPayload = { ' query ' : query , ' variables ' : { ' batch_size ' : 2 } }
respLive = ws_client . send_query ( subscrPayload , query_id = ' stream_1 ' , headers = headers , timeout = 15 )
liveQs . append ( respLive )
for idx in range ( 5 ) :
ev = next ( respLive )
assert ev [ ' type ' ] == ' data ' , ev
assert ev [ ' id ' ] == ' stream_1 ' , ev
# fetching two rows per batch
expected_payload = [ { " id " : 2 * idx + 1 , " title " : " Article title {} " . format ( 2 * idx + 1 ) } , { " id " : 2 * idx + 2 , " title " : " Article title {} " . format ( 2 * idx + 2 ) } ]
assert ev [ ' payload ' ] [ ' data ' ] == { ' hge_tests_stream_query ' : expected_payload } , ev [ ' payload ' ] [ ' data ' ]
# stop the streaming subscription
frame = {
' id ' : ' stream_1 ' ,
' type ' : ' stop '
}
ws_client . send ( frame )
with pytest . raises ( queue . Empty ) :
ev = ws_client . get_ws_event ( 3 )
def test_streaming_subscriptions_with_concurrent_data_inserts ( self , hge_ctx , ws_client ) :
'''
Create connection using connection_init
'''
ws_client . init_as_admin ( )
headers = { }
query = """
subscription ( $ batch_size : Int ! , $ initial_created_at : timestamptz ! ) {
hge_tests_stream_query : hge_tests_test_t2_stream ( cursor : [ { initial_value : { created_at : $ initial_created_at } , ordering : ASC } ] , batch_size : $ batch_size ) {
c1
c2
}
}
"""
with open ( self . dir ( ) + " /steps.yaml " ) as c :
conf = yaml . load ( c )
subscrPayload = { ' query ' : query , ' variables ' : { ' batch_size ' : 2 , ' initial_created_at ' : " 2020-01-01 " } }
respLive = ws_client . send_query ( subscrPayload , query_id = ' stream_1 ' , headers = headers , timeout = 15 )
assert isinstance ( conf , list ) == True , ' Not an list '
for index , step in enumerate ( conf ) :
mutationPayload = { ' query ' : step [ ' query ' ] }
if ' variables ' in step and step [ ' variables ' ] :
mutationPayload [ ' variables ' ] = json . loads ( step [ ' variables ' ] )
expected_resp = json . loads ( step [ ' response ' ] )
mutResp = ws_client . send_query ( mutationPayload , ' mutation_ ' + str ( index ) , timeout = 15 )
ev = next ( mutResp )
assert ev [ ' type ' ] == ' data ' and ev [ ' id ' ] == ' mutation_ ' + str ( index ) , ev
assert ev [ ' payload ' ] [ ' data ' ] == expected_resp , ev [ ' payload ' ] [ ' data ' ]
ev = next ( mutResp )
assert ev [ ' type ' ] == ' complete ' and ev [ ' id ' ] == ' mutation_ ' + str ( index ) , ev
ev = next ( respLive )
assert ev [ ' type ' ] == ' data ' , ev
assert ev [ ' id ' ] == ' stream_1 ' , ev
expectedReturnedResponse = json . loads ( step [ ' stream_response ' ] )
expectedLiveResponse = { ' hge_tests_stream_query ' : expectedReturnedResponse }
assert ev [ ' payload ' ] [ ' data ' ] == expectedLiveResponse , ev [ ' payload ' ] [ ' data ' ]
# stop the streaming subscription
frame = {
' id ' : ' stream_1 ' ,
' type ' : ' stop '
}
ws_client . send ( frame )
with pytest . raises ( queue . Empty ) :
ev = ws_client . get_ws_event ( 3 )
2021-08-24 19:25:12 +03:00
@usefixtures ( ' per_method_tests_db_state ' , ' ws_conn_init_graphql_ws ' )
class TestSubscriptionLiveQueriesForGraphQLWS :
@classmethod
def dir ( cls ) :
return ' queries/subscriptions/live_queries '
def test_live_queries ( self , hge_ctx , ws_client_graphql_ws ) :
'''
Create connection using connection_init
'''
ws_client_graphql_ws . init_as_admin ( )
with open ( self . dir ( ) + " /steps.yaml " ) as c :
2022-01-17 10:39:59 +03:00
conf = yaml . load ( c )
2021-08-24 19:25:12 +03:00
queryTmplt = """
subscription ( $ result_limit : Int ! ) {
hge_tests_live_query_ { 0 } : hge_tests_test_t2 ( order_by : { c1 : asc } , limit : $ result_limit ) {
c1 ,
c2
}
}
"""
queries = [ ( 0 , 1 ) , ( 1 , 2 ) , ( 2 , 2 ) ]
liveQs = [ ]
for i , resultLimit in queries :
query = queryTmplt . replace ( ' {0} ' , str ( i ) )
headers = { }
if hge_ctx . hge_key is not None :
headers [ ' X-Hasura-Admin-Secret ' ] = hge_ctx . hge_key
subscrPayload = { ' query ' : query , ' variables ' : { ' result_limit ' : resultLimit } }
respLive = ws_client_graphql_ws . send_query ( subscrPayload , query_id = ' live_ ' + str ( i ) , headers = headers , timeout = 15 )
liveQs . append ( respLive )
ev = next ( respLive )
assert ev [ ' type ' ] == ' next ' , ev
assert ev [ ' id ' ] == ' live_ ' + str ( i ) , ev
assert ev [ ' payload ' ] [ ' data ' ] == { ' hge_tests_live_query_ ' + str ( i ) : [ ] } , ev [ ' payload ' ] [ ' data ' ]
assert isinstance ( conf , list ) == True , ' Not an list '
for index , step in enumerate ( conf ) :
mutationPayload = { ' query ' : step [ ' query ' ] }
if ' variables ' in step and step [ ' variables ' ] :
mutationPayload [ ' variables ' ] = json . loads ( step [ ' variables ' ] )
expected_resp = json . loads ( step [ ' response ' ] )
mutResp = ws_client_graphql_ws . send_query ( mutationPayload , ' mutation_ ' + str ( index ) , timeout = 15 )
ev = next ( mutResp )
assert ev [ ' type ' ] == ' next ' and ev [ ' id ' ] == ' mutation_ ' + str ( index ) , ev
assert ev [ ' payload ' ] [ ' data ' ] == expected_resp , ev [ ' payload ' ] [ ' data ' ]
ev = next ( mutResp )
assert ev [ ' type ' ] == ' complete ' and ev [ ' id ' ] == ' mutation_ ' + str ( index ) , ev
for ( i , resultLimit ) , respLive in zip ( queries , liveQs ) :
ev = next ( respLive )
assert ev [ ' type ' ] == ' next ' , ev
assert ev [ ' id ' ] == ' live_ ' + str ( i ) , ev
expectedReturnedResponse = [ ]
if ' live_response ' in step :
expectedReturnedResponse = json . loads ( step [ ' live_response ' ] )
elif ' returning ' in expected_resp [ step [ ' name ' ] ] :
expectedReturnedResponse = expected_resp [ step [ ' name ' ] ] [ ' returning ' ]
expectedLimitedResponse = expectedReturnedResponse [ : resultLimit ]
expectedLiveResponse = { ' hge_tests_live_query_ ' + str ( i ) : expectedLimitedResponse }
assert ev [ ' payload ' ] [ ' data ' ] == expectedLiveResponse , ev [ ' payload ' ] [ ' data ' ]
for i , _ in queries :
# stop live operation
frame = {
' id ' : ' live_ ' + str ( i ) ,
' type ' : ' complete '
}
ws_client_graphql_ws . send ( frame )
ws_client_graphql_ws . clear_queue ( )
2021-05-26 12:06:02 +03:00
@pytest.mark.parametrize ( " backend " , [ ' mssql ' , ' postgres ' ] )
2021-05-25 16:54:18 +03:00
@usefixtures ( ' per_class_tests_db_state ' )
2020-02-13 12:14:02 +03:00
class TestSubscriptionMultiplexing :
2019-01-25 06:31:54 +03:00
@classmethod
def dir ( cls ) :
2019-09-30 22:50:57 +03:00
return ' queries/subscriptions/multiplexing '
2020-11-03 11:15:22 +03:00
def test_extraneous_session_variables_are_discarded_from_query ( self , hge_ctx ) :
with open ( self . dir ( ) + ' /articles_query.yaml ' ) as c :
2022-01-17 10:39:59 +03:00
config = yaml . load ( c )
2020-11-03 11:15:22 +03:00
query = config [ ' query ' ]
session_variables = {
" X-Hasura-Role " : " public " ,
" X-Hasura-User-Id " : " 1 " # extraneous session variable
}
response = self . get_explain_graphql_query_response ( hge_ctx , query , { } , session_variables )
# The input session variables should be ignored because the only check for the role is
# if `is_public` is `true`
assert response [ " variables " ] [ " session " ] == { } , response [ " variables " ]
session_variables = {
" X-Hasura-Role " : " user " ,
" X-Hasura-User-Id " : " 1 " ,
" X-Hasura-Allowed-Ids " : " { 1,3,4} " # extraneous session variable
}
response = self . get_explain_graphql_query_response ( hge_ctx , query , { } , session_variables )
# The input session variable should not be ignored because the `user` role can only
# select those roles where `user_id = X-Hasura-User-Id`
assert response [ " variables " ] [ " session " ] == { ' x-hasura-user-id ' : " 1 " } , response [ " variables " ]
def get_explain_graphql_query_response ( self , hge_ctx , query , variables , user_headers = { } ) :
2019-09-30 22:50:57 +03:00
admin_secret = hge_ctx . hge_key
headers = { }
if admin_secret is not None :
headers [ ' X-Hasura-Admin-Secret ' ] = admin_secret
2020-11-03 11:15:22 +03:00
request = { ' query ' : { ' query ' : query , ' variables ' : variables } , ' user ' : user_headers }
2019-12-25 06:35:32 +03:00
status_code , response , _ = hge_ctx . anyq ( ' /v1/graphql/explain ' , request , headers )
2019-09-30 22:50:57 +03:00
assert status_code == 200 , ( request , status_code , response )
2020-11-03 11:15:22 +03:00
return response
2019-09-30 22:50:57 +03:00
2021-05-24 10:33:33 +03:00
2021-05-25 16:54:18 +03:00
@pytest.mark.parametrize ( " backend " , [ ' postgres ' ] )
@usefixtures ( ' per_class_tests_db_state ' , ' ws_conn_init ' )
2021-05-24 10:33:33 +03:00
class TestSubscriptionUDFWithSessionArg :
"""
Test a user - defined function which uses the entire session variables as argument
"""
query = """
subscription {
me {
id
name
}
}
"""
@classmethod
def dir ( cls ) :
return ' queries/subscriptions/udf_session_args '
def test_user_defined_function_with_session_argument ( self , hge_ctx , ws_client ) :
ws_client . init_as_admin ( )
headers = { ' x-hasura-role ' : ' user ' , ' x-hasura-user-id ' : ' 42 ' }
if hge_ctx . hge_key is not None :
headers [ ' X-Hasura-Admin-Secret ' ] = hge_ctx . hge_key
payload = { ' query ' : self . query }
resp = ws_client . send_query ( payload , headers = headers , timeout = 15 )
ev = next ( resp )
assert ev [ ' type ' ] == ' data ' , ev
assert ev [ ' payload ' ] [ ' data ' ] == { ' me ' : [ { ' id ' : ' 42 ' , ' name ' : ' Charlie ' } ] } , ev [ ' payload ' ] [ ' data ' ]
2021-10-29 17:42:07 +03:00
@pytest.mark.parametrize ( " backend " , [ ' mssql ' , ' postgres ' ] )
@usefixtures ( ' per_class_tests_db_state ' , ' ws_conn_init ' )
server/tests: Fix `BigQuery test failure Job exceeded rate limits` error in CI
Fixes https://github.com/hasura/graphql-engine-mono/issues/3695.
Error: [BigQuery test failure Job exceeded rate limits](https://github.com/hasura/graphql-engine-mono/issues/3695)
Cause:
1. [this command](https://github.com/hasura/graphql-engine/blob/2325755954bb3a777403503d709b412e01219ba9/.circleci/test-server.sh#L1263) runs tests matching the `Bigquery or Common` string, for the `test-oss-server-bigquery` CI job.
2. in this case, the pytest filter matched on `TestGraphQLQueryBoolExpSearchCommon`. Although unrelated pytests are skipped, BQ setup and teardown runs uneccesarily for the [MSSQL and Postgres backends](https://github.com/hasura/graphql-engine/blob/e444cf1f5d5eb1762357266d8b298b1dfb48d937/server/tests-py/test_graphql_queries.py#L868).
4. the setup and teardown runs three times in quick succession, _for each of_ SQL Server, Postgres and BigQuery. Occasionally, this surpassed [BigQuery's maximum rate of 5 table update operations in 10 seconds](https://cloud.google.com/bigquery/quotas#load_job_per_table.long).
Fix: restrict setup/teardown to only the relevant backends...
- Hotfix (this PR): ...by renaming pytest classes and changing the pytest filters in `test-server`
- ok, this is faintly horrifying and an inelegant convention change. On the bright side, it shaves a minute or so off our integration test suite run by skipping fewer tests. Anecdata for `test-oss-server-bigquery`
- before: 87 passed, 299 skipped, 1 warning, 1 error in 192.99s
- after: 87 passed, 20 skipped, 1 warning in 170.82s
- [`Common` was a terrible name, anyway](https://github.com/hasura/graphql-engine-mono/issues/2079), for `AnyCombinationOfBackends`.
- Better fix: ...by refactoring the `conftest.py` helpers. I ran out of a timebox so will write up a separate issue. Given we're actively [porting pytests over to hspec](https://github.com/hasura/graphql-engine/issues/8432), I don't know how much it's worth investing time in a refactor.
To verify the fix: I ran a full CI build a few times [[1]](https://buildkite.com/hasura/graphql-engine-mono/builds/8069#078c781a-c8ef-44f2-a400-15f91fb88e42)[[2]](https://buildkite.com/hasura/graphql-engine-mono/builds/8072#f9e7f59d-264f-46a4-973d-21aa762cca35)[[3]](https://buildkite.com/hasura/graphql-engine-mono/builds/8075#bb104e80-ff76-408c-a46b-6f40e92e6317) whilst troubleshooting to convince myself this fixed the problem.
PR-URL: https://github.com/hasura/graphql-engine-mono/pull/4362
GitOrigin-RevId: 4c3283f0654b70e9dcda642d9012f6376aa95290
2022-04-27 21:39:40 +03:00
class TestSubscriptionCustomizedSourceMSSQLPostgres :
2021-10-29 17:42:07 +03:00
@classmethod
def dir ( cls ) :
return ' queries/subscriptions/customized_source '
setup_metadata_api_version = " v2 "
'''
Refer : https : / / github . com / apollographql / subscriptions - transport - ws / blob / master / PROTOCOL . md #gql_complete
'''
def test_complete ( self , hge_ctx , ws_client ) :
query = """
subscription MySubscription {
a : my_source {
b : fpref_author_fsuff {
id
name
}
}
}
"""
obj = {
' id ' : ' 1 ' ,
' payload ' : {
' query ' : query
} ,
' type ' : ' start '
}
ws_client . send ( obj )
'''
Refer : https : / / github . com / apollographql / subscriptions - transport - ws / blob / master / PROTOCOL . md #gql_data
'''
ev = ws_client . get_ws_query_event ( ' 1 ' , 15 )
assert ev [ ' type ' ] == ' data ' and ev [ ' id ' ] == ' 1 ' , ev
assert ev [ ' payload ' ] [ ' data ' ] [ ' a ' ] == OrderedDict ( [ ( ' b ' , [ OrderedDict ( [ ( ' id ' , 1 ) , ( ' name ' , ' Author 1 ' ) ] ) , OrderedDict ( [ ( ' id ' , 2 ) , ( ' name ' , ' Author 2 ' ) ] ) ] ) ] ) , ev
def test_double_alias ( self , hge_ctx , ws_client ) :
'''
This should give an error even though @_multiple_top_level_fields is specified .
The two different aliases for ` my_source ` mean that we would have to wrap different
parts of the DB response in different namespace fields , which is not currently possible .
'''
query = """
subscription MySubscription @_multiple_top_level_fields {
alias1 : my_source {
fpref_author_fsuff {
id
name
}
}
alias2 : my_source {
fpref_author_fsuff {
id
name
}
}
}
"""
obj = {
' id ' : ' 2 ' ,
' payload ' : {
' query ' : query
} ,
' type ' : ' start '
}
ws_client . send ( obj )
'''
Refer : https : / / github . com / apollographql / subscriptions - transport - ws / blob / master / PROTOCOL . md #gql_data
'''
ev = ws_client . get_ws_query_event ( ' 2 ' , 15 )
assert ev [ ' type ' ] == ' error ' and ev [ ' id ' ] == ' 2 ' , ev
assert ev [ ' payload ' ] [ ' errors ' ] == [ OrderedDict ( [ ( ' extensions ' , OrderedDict ( [ ( ' path ' , ' $ ' ) , ( ' code ' , ' validation-failed ' ) ] ) ) , ( ' message ' , ' subscriptions must select one top level field ' ) ] ) ] , ev
2022-03-21 15:14:52 +03:00
@pytest.mark.parametrize ( " backend " , [ ' mssql ' ] )
@usefixtures ( ' per_class_tests_db_state ' , ' ws_conn_init ' )
class TestSubscriptionMSSQLChunkedResults :
@classmethod
def dir ( cls ) :
return ' queries/subscriptions/mssql '
query = """
subscription {
hge_tests_test_subscriptions {
field1
}
}
"""
def test_chunked_results ( self , ws_client ) :
obj = {
' id ' : ' 1 ' ,
' payload ' : {
' query ' : self . query
} ,
' type ' : ' start '
}
ws_client . send ( obj )
ev = ws_client . get_ws_query_event ( ' 1 ' , 15 )
assert ev [ ' type ' ] == ' data ' and ev [ ' id ' ] == ' 1 ' , ev
assert not " errors " in ev [ ' payload ' ] , ev