mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-14 17:02:49 +03:00
b8629eaa58
## Summary by CodeRabbit ## Release Notes - **Documentation** - Updated the configuration documentation for the Hasura GraphQL Engine, including new flags and environment variables, with clarifications on WebSocket connection initialization and deprecated options. - **Bug Fixes** - Enhanced WebSocket connection management and error handling, ensuring proper initialization and cleanup of connections across various components. - **Tests** - Improved tests for WebSocket connection handling and logging, ensuring robust verification of connection states and error responses. PR-URL: https://github.com/hasura/graphql-engine-mono/pull/11069 GitOrigin-RevId: 8ee25d702a64f3bb04077bbcf0f3e1bd10c916d6
1026 lines
35 KiB
Python
1026 lines
35 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import graphql
|
|
from http import HTTPStatus
|
|
import http.server
|
|
import json
|
|
import os
|
|
import queue
|
|
import random
|
|
import re
|
|
import requests
|
|
import ruamel.yaml as yaml
|
|
from ruamel.yaml.comments import CommentedMap as OrderedDict # to avoid '!!omap' in yaml
|
|
import socketserver
|
|
import sqlalchemy
|
|
import sqlalchemy.schema
|
|
import string
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
from typing import Any, NamedTuple, Optional
|
|
from urllib.parse import urlparse
|
|
import websocket
|
|
|
|
import fixtures.tls
|
|
import graphql_server
|
|
import ports
|
|
|
|
# pytest has removed the global pytest.config
|
|
# As a solution to this we are going to store it in PyTestConf.config
|
|
class PytestConf():
|
|
config: Any
|
|
pass
|
|
|
|
class HGECtxError(Exception):
|
|
pass
|
|
|
|
# NOTE: use this to generate a GraphQL client that uses the `Apollo`(subscription-transport-ws) sub-protocol
|
|
class GQLWsClient():
|
|
|
|
def __init__(self, hge_ctx, endpoint):
|
|
self.hge_ctx = hge_ctx
|
|
self.ws_queue = queue.Queue(maxsize=-1)
|
|
self.ws_url = urlparse(hge_ctx.hge_url)._replace(scheme='ws',
|
|
path=endpoint)
|
|
self.conn_created = False
|
|
|
|
def create_conn(self):
|
|
self.ws_queue.queue.clear()
|
|
self.ws_id_query_queues = dict()
|
|
self.ws_active_query_ids = set()
|
|
|
|
self.connected_event = threading.Event()
|
|
self.init_done = False
|
|
self.is_closing = False
|
|
self.remote_closed = False
|
|
|
|
self._ws = websocket.WebSocketApp(self.ws_url.geturl(),
|
|
on_open=self._on_open, on_message=self._on_message, on_close=self._on_close)
|
|
self.wst = threading.Thread(target=self._ws.run_forever)
|
|
self.wst.daemon = True
|
|
self.wst.start()
|
|
self.conn_created = True
|
|
|
|
def wait_for_connection(self, timeout=10):
|
|
assert not self.is_closing
|
|
assert self.connected_event.wait(timeout=timeout)
|
|
|
|
def get_ws_event(self, timeout):
|
|
return self.ws_queue.get(timeout=timeout)
|
|
|
|
def has_ws_query_events(self, query_id):
|
|
return not self.ws_id_query_queues[query_id].empty()
|
|
|
|
def get_ws_query_event(self, query_id, timeout):
|
|
return self.ws_id_query_queues[query_id].get(timeout=timeout)
|
|
|
|
def send(self, frame, headers={}, count=0):
|
|
self.wait_for_connection()
|
|
if frame.get('type') == 'stop':
|
|
self.ws_active_query_ids.discard( frame.get('id') )
|
|
elif frame.get('type') == 'start' and 'id' in frame:
|
|
self.ws_id_query_queues[frame['id']] = queue.Queue(maxsize=-1)
|
|
try:
|
|
self._ws.send(json.dumps(frame))
|
|
except websocket.WebSocketConnectionClosedException:
|
|
if count > 2:
|
|
raise websocket.WebSocketConnectionClosedException("Connection is already closed and cannot be recreated even after 3 attempts")
|
|
# Connection closed, try to recreate the connection and send the frame again
|
|
self.init(headers)
|
|
self.send(frame, count+1)
|
|
|
|
def init_as_admin(self):
|
|
headers={}
|
|
if self.hge_ctx.hge_key:
|
|
headers = {'x-hasura-admin-secret': self.hge_ctx.hge_key}
|
|
self.init(headers)
|
|
|
|
def init(self, headers={}):
|
|
self.create_conn()
|
|
payload = {'type': 'connection_init', 'payload': {}}
|
|
|
|
if headers and len(headers) > 0:
|
|
payload['payload']['headers'] = headers
|
|
|
|
self.send(payload)
|
|
ev = self.get_ws_event(3)
|
|
assert ev['type'] == 'connection_ack', ev
|
|
self.init_done = True
|
|
|
|
def stop(self, query_id):
|
|
data = {'id': query_id, 'type': 'stop'}
|
|
self.send(data)
|
|
self.ws_active_query_ids.discard(query_id)
|
|
|
|
def gen_id(self, size=6, chars=string.ascii_letters + string.digits):
|
|
new_id = ''.join(random.choice(chars) for _ in range(size))
|
|
if new_id in self.ws_active_query_ids:
|
|
return self.gen_id(size, chars)
|
|
return new_id
|
|
|
|
def send_query(self, query, query_id=None, timeout=60):
|
|
if not self.init_done:
|
|
self.init()
|
|
graphql.parse(query['query'])
|
|
if query_id == None:
|
|
query_id = self.gen_id()
|
|
frame = {
|
|
'id': query_id,
|
|
'type': 'start',
|
|
'payload': query,
|
|
}
|
|
self.ws_active_query_ids.add(query_id)
|
|
self.send(frame)
|
|
while True:
|
|
yield self.get_ws_query_event(query_id, timeout)
|
|
|
|
def _on_open(self):
|
|
if not self.is_closing:
|
|
self.connected_event.set()
|
|
|
|
def _on_message(self, message):
|
|
# NOTE: make sure we preserve key ordering so we can test the ordering
|
|
# properties in the graphql spec properly
|
|
json_msg = json.loads(message, object_pairs_hook=OrderedDict)
|
|
if 'id' in json_msg:
|
|
query_id = json_msg['id']
|
|
if json_msg.get('type') == 'stop':
|
|
#Remove from active queries list
|
|
self.ws_active_query_ids.discard( query_id )
|
|
if not query_id in self.ws_id_query_queues:
|
|
self.ws_id_query_queues[json_msg['id']] = queue.Queue(maxsize=-1)
|
|
#Put event in the correponding query_queue
|
|
self.ws_id_query_queues[query_id].put(json_msg)
|
|
elif json_msg['type'] != 'ka':
|
|
#Put event in the main queue
|
|
self.ws_queue.put(json_msg)
|
|
|
|
def _on_close(self):
|
|
self.remote_closed = True
|
|
self.init_done = False
|
|
|
|
def get_conn_close_state(self):
|
|
return self.remote_closed or self.is_closing
|
|
|
|
def teardown(self):
|
|
if self.conn_created:
|
|
self.is_closing = True
|
|
if not self.remote_closed:
|
|
self._ws.close()
|
|
self.wst.join()
|
|
self.conn_created = False
|
|
|
|
# NOTE: use this to generate a GraphQL client that uses the `graphql-ws` sub-protocol
|
|
class GraphQLWSClient():
|
|
|
|
def __init__(self, hge_ctx, endpoint):
|
|
self.hge_ctx = hge_ctx
|
|
self.ws_queue = queue.Queue(maxsize=-1)
|
|
self.ws_url = urlparse(hge_ctx.hge_url)._replace(scheme='ws',
|
|
path=endpoint)
|
|
self.conn_created = False
|
|
|
|
def get_queue(self):
|
|
return self.ws_queue.queue
|
|
|
|
def clear_queue(self):
|
|
self.ws_queue.queue.clear()
|
|
|
|
def create_conn(self):
|
|
self.ws_queue.queue.clear()
|
|
self.ws_id_query_queues = dict()
|
|
self.ws_active_query_ids = set()
|
|
|
|
self.connected_event = threading.Event()
|
|
self.init_done = False
|
|
self.is_closing = False
|
|
self.remote_closed = False
|
|
|
|
self._ws = websocket.WebSocketApp(self.ws_url.geturl(),
|
|
on_open=self._on_open, on_message=self._on_message, on_close=self._on_close, subprotocols=["graphql-transport-ws"])
|
|
self.wst = threading.Thread(target=self._ws.run_forever)
|
|
self.wst.daemon = True
|
|
self.wst.start()
|
|
self.conn_created = True
|
|
|
|
def wait_for_connection(self, timeout=10):
|
|
assert not self.is_closing
|
|
assert self.connected_event.wait(timeout=timeout)
|
|
|
|
def get_ws_event(self, timeout):
|
|
return self.ws_queue.get(timeout=timeout)
|
|
|
|
def has_ws_query_events(self, query_id):
|
|
return not self.ws_id_query_queues[query_id].empty()
|
|
|
|
def get_ws_query_event(self, query_id, timeout):
|
|
print("HELLO", self.ws_active_query_ids)
|
|
return self.ws_id_query_queues[query_id].get(timeout=timeout)
|
|
|
|
def send(self, frame):
|
|
self.wait_for_connection()
|
|
if frame.get('type') == 'complete':
|
|
self.ws_active_query_ids.discard( frame.get('id') )
|
|
elif frame.get('type') == 'subscribe' and 'id' in frame:
|
|
self.ws_id_query_queues[frame['id']] = queue.Queue(maxsize=-1)
|
|
self._ws.send(json.dumps(frame))
|
|
|
|
def init_as_admin(self):
|
|
headers={}
|
|
if self.hge_ctx.hge_key:
|
|
headers = {'x-hasura-admin-secret': self.hge_ctx.hge_key}
|
|
self.init(headers)
|
|
|
|
def init(self, headers={}):
|
|
self.create_conn()
|
|
payload = {'type': 'connection_init', 'payload': {}}
|
|
|
|
if headers and len(headers) > 0:
|
|
payload['payload']['headers'] = headers
|
|
|
|
self.send(payload)
|
|
ev = self.get_ws_event(5)
|
|
assert ev['type'] == 'connection_ack', ev
|
|
self.init_done = True
|
|
|
|
def stop(self, query_id):
|
|
data = {'id': query_id, 'type': 'complete'}
|
|
self.send(data)
|
|
self.ws_active_query_ids.discard(query_id)
|
|
|
|
def gen_id(self, size=6, chars=string.ascii_letters + string.digits):
|
|
new_id = ''.join(random.choice(chars) for _ in range(size))
|
|
if new_id in self.ws_active_query_ids:
|
|
return self.gen_id(size, chars)
|
|
return new_id
|
|
|
|
def send_query(self, query, query_id=None, timeout=60):
|
|
graphql.parse(query['query'])
|
|
if not self.init_done:
|
|
self.init()
|
|
if query_id == None:
|
|
query_id = self.gen_id()
|
|
frame = {
|
|
'id': query_id,
|
|
'type': 'subscribe',
|
|
'payload': query,
|
|
}
|
|
self.ws_active_query_ids.add(query_id)
|
|
self.send(frame)
|
|
while True:
|
|
yield self.get_ws_query_event(query_id, timeout)
|
|
|
|
def _on_open(self):
|
|
if not self.is_closing:
|
|
self.connected_event.set()
|
|
|
|
def _on_message(self, message):
|
|
# NOTE: make sure we preserve key ordering so we can test the ordering
|
|
# properties in the graphql spec properly
|
|
json_msg = json.loads(message, object_pairs_hook=OrderedDict)
|
|
if json_msg['type'] == 'ping':
|
|
new_msg = json_msg
|
|
new_msg['type'] = 'pong'
|
|
# Decline to reflect the payload of the ping, because the
|
|
# graphql-ws specification does not require it
|
|
new_msg.pop('payload')
|
|
self.send(json.dumps(new_msg))
|
|
return
|
|
|
|
if 'id' in json_msg:
|
|
query_id = json_msg['id']
|
|
if json_msg.get('type') == 'complete':
|
|
#Remove from active queries list
|
|
self.ws_active_query_ids.discard( query_id )
|
|
if not query_id in self.ws_id_query_queues:
|
|
self.ws_id_query_queues[json_msg['id']] = queue.Queue(maxsize=-1)
|
|
#Put event in the correponding query_queue
|
|
self.ws_id_query_queues[query_id].put(json_msg)
|
|
|
|
if json_msg['type'] != 'ping':
|
|
self.ws_queue.put(json_msg)
|
|
|
|
def _on_close(self):
|
|
self.remote_closed = True
|
|
self.init_done = False
|
|
|
|
def get_conn_close_state(self):
|
|
return self.remote_closed or self.is_closing
|
|
|
|
def teardown(self):
|
|
if self.conn_created:
|
|
self.is_closing = True
|
|
if not self.remote_closed:
|
|
self._ws.close()
|
|
self.wst.join()
|
|
|
|
class ActionsWebhookHandler(http.server.BaseHTTPRequestHandler):
|
|
hge_url: str
|
|
hge_key: Optional[str]
|
|
|
|
def do_GET(self):
|
|
self.send_response(HTTPStatus.OK)
|
|
self.end_headers()
|
|
|
|
def do_POST(self):
|
|
content_len = self.headers.get('Content-Length')
|
|
req_body = self.rfile.read(int(content_len)).decode("utf-8")
|
|
self.req_json = json.loads(req_body)
|
|
req_headers = self.headers
|
|
req_path = self.path
|
|
self.log_message(json.dumps(self.req_json))
|
|
|
|
if req_path == "/create-user":
|
|
resp, status = self.create_user()
|
|
self._send_response(status, resp)
|
|
|
|
elif req_path == "/create-user-timeout":
|
|
time.sleep(3)
|
|
resp, status = self.create_user()
|
|
self._send_response(status, resp)
|
|
|
|
elif req_path == "/create-users":
|
|
resp, status = self.create_users()
|
|
self._send_response(status, resp)
|
|
|
|
elif req_path == "/create-user-nested":
|
|
resp, status = self.create_user_nested()
|
|
self._send_response(status, resp)
|
|
|
|
elif req_path == "/mirror-action":
|
|
resp, status = self.mirror_action()
|
|
self._send_response(status, resp)
|
|
|
|
elif req_path == "/mirror-headers":
|
|
resp, status = self.mirror_headers()
|
|
self._send_response(status, resp)
|
|
|
|
elif req_path == "/get-user-by-email":
|
|
resp, status = self.get_users_by_email(True)
|
|
self._send_response(status, resp)
|
|
|
|
elif req_path == "/get-user-by-email-nested":
|
|
resp, status = self.get_users_by_email_nested(True)
|
|
self._send_response(status, resp)
|
|
|
|
elif req_path == "/get-users-by-email":
|
|
resp, status = self.get_users_by_email(False)
|
|
self._send_response(status, resp)
|
|
|
|
elif req_path == "/get-users-by-email-nested":
|
|
resp, status = self.get_users_by_email_nested(False)
|
|
self._send_response(status, resp)
|
|
|
|
elif req_path == "/intentional-error":
|
|
resp, status = self.intentional_error()
|
|
self._send_response(status, resp)
|
|
|
|
elif req_path == "/null-response":
|
|
resp, status = self.null_response()
|
|
self._send_response(status, resp)
|
|
|
|
elif req_path == "/omitted-response-field":
|
|
self._send_response(
|
|
HTTPStatus.OK,
|
|
self.get_omitted_response_field()
|
|
)
|
|
|
|
elif req_path == "/scalar-response":
|
|
self._send_response(HTTPStatus.OK, "some-string")
|
|
|
|
elif req_path == "/json-response":
|
|
resp, status = self.json_response()
|
|
self._send_response(status, resp)
|
|
|
|
elif req_path == "/custom-scalar-array-response":
|
|
resp, status = self.custom_scalar_array_response()
|
|
self._send_response(status, resp)
|
|
|
|
elif req_path == "/scalar-array-response":
|
|
self._send_response(HTTPStatus.OK, ["foo", "bar", None])
|
|
|
|
elif req_path == "/recursive-output":
|
|
resp, status = self.recursive_output()
|
|
self._send_response(status, resp)
|
|
|
|
elif req_path == "/get-results":
|
|
resp, status = self.get_results()
|
|
self._send_response(status, resp)
|
|
|
|
elif req_path == "/typed-nested-null":
|
|
self._send_response(
|
|
HTTPStatus.OK,
|
|
self.get_typed_nested_null()
|
|
)
|
|
|
|
elif req_path == "/typed-nested-null-wrong-field":
|
|
self._send_response(
|
|
HTTPStatus.OK,
|
|
self.get_typed_nested_null_wrong_field()
|
|
)
|
|
|
|
elif req_path == "/get_messages":
|
|
resp, status = self.get_messages()
|
|
self._send_response(status, resp)
|
|
|
|
else:
|
|
self.send_response(HTTPStatus.NO_CONTENT)
|
|
self.end_headers()
|
|
|
|
def intentional_error(self):
|
|
blob = self.req_json['input']['blob']
|
|
return blob, HTTPStatus.BAD_REQUEST
|
|
|
|
def create_user(self):
|
|
email_address = self.req_json['input']['email']
|
|
name = self.req_json['input']['name']
|
|
|
|
if not self.check_email(email_address):
|
|
response = {
|
|
'message': 'Given email address is not valid',
|
|
'code': 'invalid-email'
|
|
}
|
|
return response, HTTPStatus.BAD_REQUEST
|
|
|
|
gql_query = '''
|
|
mutation ($email: String! $name: String!) {
|
|
insert_user_one(object: {email: $email, name: $name}){
|
|
id
|
|
}
|
|
}
|
|
'''
|
|
query = {
|
|
'query': gql_query,
|
|
'variables': {
|
|
'email': email_address,
|
|
'name': name
|
|
}
|
|
}
|
|
code, resp = self.execute_query(query)
|
|
if code != 200 or 'data' not in resp:
|
|
response = {
|
|
'message': 'GraphQL query execution failed',
|
|
'code': 'unexpected'
|
|
}
|
|
return response, HTTPStatus.BAD_REQUEST
|
|
|
|
response = resp['data']['insert_user_one']
|
|
|
|
return response, HTTPStatus.OK
|
|
|
|
def create_users(self):
|
|
inputs = self.req_json['input']['users']
|
|
for input in inputs:
|
|
email_address = input['email']
|
|
if not self.check_email(email_address):
|
|
response = {
|
|
'message': 'Email address is not valid: ' + email_address,
|
|
'code': 'invalid-email'
|
|
}
|
|
return response, HTTPStatus.BAD_REQUEST
|
|
|
|
gql_query = '''
|
|
mutation ($insert_inputs: [user_insert_input!]!){
|
|
insert_user(objects: $insert_inputs){
|
|
returning{
|
|
id
|
|
}
|
|
}
|
|
}
|
|
'''
|
|
query = {
|
|
'query': gql_query,
|
|
'variables': {
|
|
'insert_inputs': inputs
|
|
}
|
|
}
|
|
code, resp = self.execute_query(query)
|
|
if code != 200 or 'data' not in resp:
|
|
response = {
|
|
'message': 'GraphQL query execution failed',
|
|
'code': 'unexpected'
|
|
}
|
|
return response, HTTPStatus.BAD_REQUEST
|
|
|
|
response = resp['data']['insert_user']['returning']
|
|
return response, HTTPStatus.OK
|
|
|
|
def create_user_nested(self):
|
|
email_address = self.req_json['input']['email']
|
|
name = self.req_json['input']['name']
|
|
|
|
if not self.check_email(email_address):
|
|
response = {
|
|
'message': 'Given email address is not valid',
|
|
'code': 'invalid-email'
|
|
}
|
|
return response, HTTPStatus.BAD_REQUEST
|
|
|
|
gql_query = '''
|
|
mutation ($email: String! $name: String!) {
|
|
insert_user_one(object: {email: $email, name: $name}){
|
|
id
|
|
}
|
|
}
|
|
'''
|
|
query = {
|
|
'query': gql_query,
|
|
'variables': {
|
|
'email': email_address,
|
|
'name': name
|
|
}
|
|
}
|
|
code, resp = self.execute_query(query)
|
|
if code != 200 or 'data' not in resp:
|
|
response = {
|
|
'message': 'GraphQL query execution failed',
|
|
'code': 'unexpected'
|
|
}
|
|
return response, HTTPStatus.BAD_REQUEST
|
|
|
|
response = {
|
|
'userObj': resp['data']['insert_user_one']
|
|
}
|
|
return response, HTTPStatus.OK
|
|
|
|
def get_messages(self):
|
|
response = [
|
|
{ "content": "baz", "user_name": "foo"},
|
|
{ "content": "foo", "user_name": "bar"}
|
|
]
|
|
return response, HTTPStatus.OK
|
|
|
|
def mirror_action(self):
|
|
response = self.req_json['input']['arg']
|
|
return response, HTTPStatus.OK
|
|
|
|
def mirror_headers(self):
|
|
response = {
|
|
'headers': list(map(lambda header: { 'name': header[0], 'value': header[1] }, self.headers.items()))
|
|
}
|
|
return response, HTTPStatus.OK
|
|
|
|
def get_users_by_email(self, singleUser = False):
|
|
email = self.req_json['input']['email']
|
|
if not self.check_email(email):
|
|
response = {
|
|
'message': 'Given email address is not valid',
|
|
'code': 'invalid-email'
|
|
}
|
|
return response, HTTPStatus.BAD_REQUEST
|
|
gql_query = '''
|
|
query get_user($email:String!) {
|
|
user(where:{email:{_eq:$email}},order_by: {id: asc}) {
|
|
id
|
|
}
|
|
}
|
|
'''
|
|
query = {
|
|
'query': gql_query,
|
|
'variables':{
|
|
'email':email
|
|
}
|
|
}
|
|
code,resp = self.execute_query(query)
|
|
if code != 200 or 'data' not in resp:
|
|
response = {
|
|
'message': 'GraphQL query execution failed',
|
|
'code': 'unexpected'
|
|
}
|
|
return response, HTTPStatus.BAD_REQUEST
|
|
if singleUser:
|
|
return resp['data']['user'][0], HTTPStatus.OK
|
|
else:
|
|
return resp['data']['user'], HTTPStatus.OK
|
|
|
|
def get_users_by_email_nested(self, singleUser = False):
|
|
resp, status = self.get_users_by_email(singleUser)
|
|
def make_nested_out_object(outObj):
|
|
address = { 'city': 'New York', 'country': 'USA'}
|
|
outObj['address'] = address
|
|
addresses = [{'city': 'Bangalore', 'country': 'India'}, {'city': 'Melbourne', 'country': 'Australia'}]
|
|
outObj['addresses'] = addresses
|
|
outObj['user_id'] = { 'id': outObj['id']}
|
|
return outObj
|
|
|
|
if status != HTTPStatus.OK:
|
|
return resp, status
|
|
if singleUser:
|
|
return make_nested_out_object(resp), status
|
|
else:
|
|
return map(make_nested_out_object, resp), status
|
|
|
|
def get_typed_nested_null(self):
|
|
return {
|
|
'id': 1,
|
|
'child': None
|
|
}
|
|
|
|
def get_omitted_response_field(self):
|
|
return {
|
|
'country': 'India'
|
|
}
|
|
|
|
def get_typed_nested_null_wrong_field(self):
|
|
return {
|
|
'id': None,
|
|
'child': None
|
|
}
|
|
|
|
def null_response(self):
|
|
response = None
|
|
return response, HTTPStatus.OK
|
|
|
|
def json_response(self):
|
|
response = {
|
|
'foo': 'bar'
|
|
}
|
|
return response, HTTPStatus.OK
|
|
|
|
def custom_scalar_array_response(self):
|
|
response = [{
|
|
'foo': 'bar'
|
|
}]
|
|
return response, HTTPStatus.OK
|
|
|
|
def recursive_output(self):
|
|
return {
|
|
'direct': {'id': 1, 'this': {'id': 2, 'this': {'id': 3 }}},
|
|
'list': {'id': 1, 'these': [{'id': 2, 'these': [{'id': 3}]}, {'id': 4}]},
|
|
'mutual': {'id': 1, 'that': {'id': 2, 'other': {'id': 3, 'that': {'id': 4}}}}
|
|
}, HTTPStatus.OK
|
|
|
|
def get_results(self):
|
|
return {
|
|
'result_ids': [1,2,3,4]
|
|
}, HTTPStatus.OK
|
|
|
|
def check_email(self, email):
|
|
regex = '^\\w+([\\.-]?\\w+)*@\\w+([\\.-]?\\w+)*(\\.\\w{2,3})+$'
|
|
return re.search(regex,email)
|
|
|
|
def execute_query(self, query):
|
|
headers = {}
|
|
admin_secret = self.hge_key
|
|
if admin_secret is not None:
|
|
headers['X-Hasura-Admin-Secret'] = admin_secret
|
|
resp = requests.post(
|
|
self.hge_url + '/v1/graphql',
|
|
json=query,
|
|
headers=headers,
|
|
timeout=60,
|
|
)
|
|
data = resp.json(object_pairs_hook=OrderedDict)
|
|
self.log_message(json.dumps(data))
|
|
return resp.status_code, data
|
|
|
|
def _send_response(self, status, body):
|
|
self.log_request(status)
|
|
self.send_response_only(status)
|
|
self.send_header('Content-Type', 'application/json')
|
|
self.send_header('Set-Cookie', 'abcd')
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(body).encode("utf-8"))
|
|
|
|
|
|
class ActionsWebhookServer(http.server.HTTPServer):
|
|
def __init__(self, hge_url, hge_key, server_address):
|
|
handler = ActionsWebhookHandler
|
|
handler.hge_url = hge_url
|
|
handler.hge_key = hge_key
|
|
super().__init__(server_address, handler)
|
|
|
|
@property
|
|
def url(self):
|
|
return f'http://{self.server_address[0]}:{self.server_address[1]}'
|
|
|
|
class EvtsWebhookHandler(http.server.BaseHTTPRequestHandler):
|
|
server: 'EvtsWebhookServer' # type: ignore
|
|
|
|
def do_GET(self):
|
|
self.send_response(HTTPStatus.OK)
|
|
self.end_headers()
|
|
|
|
def do_POST(self):
|
|
content_len = int(self.headers['Content-Length'])
|
|
req_body = self.rfile.read(content_len).decode("utf-8")
|
|
req_json = json.loads(req_body)
|
|
req_headers = self.headers
|
|
req_path = self.path
|
|
self.log_message(json.dumps(req_json))
|
|
if req_path == "/fail":
|
|
self.send_response(HTTPStatus.INTERNAL_SERVER_ERROR)
|
|
self.end_headers()
|
|
# This endpoint just sleeps for 2 seconds:
|
|
elif req_path == "/sleep_2s":
|
|
time.sleep(2)
|
|
self.send_response(HTTPStatus.NO_CONTENT)
|
|
self.end_headers()
|
|
# This is like a sleep endpoint above, but allowing us to decide
|
|
# externally when the webhook can return, with unblock()
|
|
elif req_path == "/block":
|
|
if not self.server.unblocked:
|
|
self.server.blocked_count += 1
|
|
with self.server.unblocked_wait:
|
|
# We expect this timeout never to be reached, but if
|
|
# something goes wrong the main thread will block forever:
|
|
self.server.unblocked_wait.wait(timeout=60)
|
|
self.server.blocked_count -= 1
|
|
self.send_response(HTTPStatus.NO_CONTENT)
|
|
self.end_headers()
|
|
else:
|
|
self.send_response(HTTPStatus.NO_CONTENT)
|
|
self.end_headers()
|
|
|
|
self.server.resp_queue.put({
|
|
"path": req_path,
|
|
"body": req_json,
|
|
"headers": req_headers,
|
|
})
|
|
|
|
# A very slightly more sane/performant http server.
|
|
# See: https://stackoverflow.com/a/14089457/176841
|
|
#
|
|
# TODO use this elsewhere, or better yet: use e.g. bottle + waitress
|
|
class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
|
|
"""Handle requests in a separate thread."""
|
|
|
|
@property
|
|
def host(self):
|
|
# We assume that everything runs on "localhost"
|
|
return 'localhost'
|
|
|
|
@property
|
|
def port(self):
|
|
if not self.server_address:
|
|
raise Exception('The server is not started.')
|
|
return self.server_address[1]
|
|
|
|
@property
|
|
def url(self):
|
|
return f'http://{self.host}:{self.port}'
|
|
|
|
class EvtsWebhookServer(ThreadedHTTPServer):
|
|
def __init__(self, server_address):
|
|
# Data received from hasura by our web hook, pushed after it returns to the client:
|
|
self.resp_queue = queue.Queue()
|
|
# We use these two vars to coordinate unblocking in the /block route
|
|
self.unblocked = False
|
|
self.unblocked_wait = threading.Condition()
|
|
# ...and this for bookkeeping open blocked requests; this becomes
|
|
# meaningless after the first call to unblock()
|
|
self.blocked_count = 0
|
|
|
|
super().__init__(server_address, EvtsWebhookHandler)
|
|
|
|
# Unblock all webhook requests to /block. Idempotent.
|
|
def unblock(self):
|
|
self.unblocked = True
|
|
with self.unblocked_wait:
|
|
# NOTE: this only affects currently wait()-ing threads, future
|
|
# wait()s will block again (hence the simple self.unblocked flag)
|
|
self.unblocked_wait.notify_all()
|
|
|
|
def get_event(self, timeout):
|
|
return self.resp_queue.get(timeout=timeout)
|
|
|
|
def is_queue_empty(self):
|
|
return self.resp_queue.empty
|
|
|
|
class HGECtxGQLServer:
|
|
def __init__(self, server_address: tuple[str, int], tls_ca_configuration: Optional[fixtures.tls.TLSCAConfiguration] = None, hge_urls: list[str] = []):
|
|
self.server_address = server_address
|
|
self.tls_ca_configuration = tls_ca_configuration
|
|
self.server: Optional[http.server.HTTPServer] = None
|
|
|
|
def start_server(self):
|
|
if not self.server:
|
|
self.server = graphql_server.create_server(self.server_address, self.tls_ca_configuration)
|
|
self.thread = threading.Thread(target=self.server.serve_forever)
|
|
self.thread.start()
|
|
# If the port is specified as 0, we will get a different,
|
|
# dynamically-allocated port whenever we restart. This captures the
|
|
# actual assigned port so that we re-use it.
|
|
self.server_address = self.server.server_address
|
|
ports.wait_for_port(self.port)
|
|
|
|
def stop_server(self):
|
|
if self.server:
|
|
graphql_server.stop_server(self.server)
|
|
self.thread.join()
|
|
self.server = None
|
|
|
|
@property
|
|
def url(self):
|
|
scheme = 'https' if self.tls_ca_configuration else 'http'
|
|
return f'{scheme}://{self.host}:{self.port}'
|
|
|
|
@property
|
|
def host(self):
|
|
# We must use 'localhost' and not `self.server.server_address[0]`
|
|
# because when using TLS, we need a domain name, not an IP address.
|
|
return 'localhost'
|
|
|
|
@property
|
|
def port(self):
|
|
if not self.server:
|
|
raise Exception('The server is not started.')
|
|
return self.server.server_address[1]
|
|
|
|
class HGECtxWebhook(NamedTuple):
|
|
tls_trust: Optional[fixtures.tls.TLSTrust]
|
|
|
|
class HGECtx:
|
|
|
|
def __init__(
|
|
self,
|
|
hge_url: str,
|
|
metadata_schema_url: str,
|
|
hge_key: Optional[str],
|
|
webhook: Optional[HGECtxWebhook],
|
|
enabled_apis: Optional[set[str]],
|
|
clear_dbs: bool,
|
|
config,
|
|
):
|
|
self.http = requests.Session()
|
|
self.timeout = 120 # BigQuery can take a while
|
|
|
|
self.hge_url = hge_url
|
|
self.metadata_schema_url = metadata_schema_url
|
|
self.hge_key = hge_key
|
|
self.webhook = webhook
|
|
self.may_skip_test_teardown = False
|
|
|
|
# This will be GC'd, but we also explicitly dispose() in teardown()
|
|
self.engine: sqlalchemy.engine.Engine = sqlalchemy.create_engine(self.metadata_schema_url) # type: ignore
|
|
self.meta = sqlalchemy.schema.MetaData()
|
|
|
|
self.hge_scale_url = config.getoption('--test-hge-scale-url')
|
|
self.pro_tests = config.getoption('--pro-tests')
|
|
|
|
self.ws_client = GQLWsClient(self, '/v1/graphql')
|
|
self.ws_client_v1alpha1 = GQLWsClient(self, '/v1alpha1/graphql')
|
|
self.ws_client_relay = GQLWsClient(self, '/v1beta1/relay')
|
|
self.ws_client_graphql_ws = GraphQLWSClient(self, '/v1/graphql')
|
|
|
|
self.backend = config.getoption('--backend')
|
|
self.default_backend = 'postgres'
|
|
self.is_default_backend = self.backend == self.default_backend
|
|
|
|
env_version = os.getenv('VERSION')
|
|
if env_version:
|
|
self.version = env_version
|
|
else:
|
|
# HGE version
|
|
result = subprocess.run(['../../scripts/get-version.sh'], shell=False, stdout=subprocess.PIPE, check=True)
|
|
self.version = result.stdout.decode('utf-8').strip()
|
|
# TODO: remove once parallelization work is completed
|
|
if clear_dbs and self.is_default_backend and (not enabled_apis or 'metadata' in enabled_apis):
|
|
try:
|
|
self.v2q_f("queries/" + self.backend_suffix("clear_db")+ ".yaml")
|
|
except requests.exceptions.RequestException as e:
|
|
self.teardown()
|
|
raise HGECtxError(repr(e))
|
|
|
|
def reflect_tables(self):
|
|
self.meta.reflect(bind=self.engine)
|
|
|
|
def anyq(self, u, q, h, b = None, v = None):
|
|
|
|
resp = None
|
|
if v == 'GET':
|
|
resp = self.http.get(
|
|
self.hge_url + u,
|
|
headers=h,
|
|
timeout=self.timeout,
|
|
)
|
|
elif v == 'POSTJSON' and b:
|
|
resp = self.http.post(
|
|
self.hge_url + u,
|
|
json=b,
|
|
headers=h,
|
|
timeout=self.timeout,
|
|
)
|
|
elif v == 'POST' and b:
|
|
# TODO: Figure out why the requests are failing with a byte object passed in as `data`
|
|
resp = self.http.post(
|
|
self.hge_url + u,
|
|
data=b,
|
|
headers=h,
|
|
timeout=self.timeout,
|
|
)
|
|
elif v == 'PATCH' and b:
|
|
resp = self.http.patch(
|
|
self.hge_url + u,
|
|
data=b,
|
|
headers=h,
|
|
timeout=self.timeout,
|
|
)
|
|
elif v == 'PUT' and b:
|
|
resp = self.http.put(
|
|
self.hge_url + u,
|
|
data=b,
|
|
headers=h,
|
|
timeout=self.timeout,
|
|
)
|
|
elif v == 'DELETE':
|
|
resp = self.http.delete(
|
|
self.hge_url + u,
|
|
headers=h,
|
|
timeout=self.timeout,
|
|
)
|
|
else:
|
|
resp = self.http.post(
|
|
self.hge_url + u,
|
|
json=q,
|
|
headers=h,
|
|
timeout=self.timeout,
|
|
)
|
|
# NOTE: make sure we preserve key ordering so we can test the ordering
|
|
# properties in the graphql spec properly
|
|
# Returning response headers to get the request id from response
|
|
return resp.status_code, resp.json(object_pairs_hook=OrderedDict), resp.headers
|
|
|
|
# Executes a query, but does not return the result.
|
|
def sql(self, q):
|
|
with self.engine.connect() as conn:
|
|
conn.execute(q)
|
|
|
|
def execute_query(self, q, url_path, headers = {}, expected_status_code = 200):
|
|
h = headers.copy()
|
|
if self.hge_key is not None and 'X-Hasura-Admin-Secret' not in headers:
|
|
h['X-Hasura-Admin-Secret'] = self.hge_key
|
|
resp = self.http.post(
|
|
self.hge_url + url_path,
|
|
json=q,
|
|
headers=h,
|
|
timeout=self.timeout,
|
|
)
|
|
# NOTE: make sure we preserve key ordering so we can test the ordering
|
|
# properties in the graphql spec properly
|
|
# Don't assume `resp` is JSON object
|
|
try:
|
|
resp_obj = resp.json(object_pairs_hook=OrderedDict)
|
|
except requests.exceptions.JSONDecodeError:
|
|
resp_obj = resp.text
|
|
if expected_status_code:
|
|
assert \
|
|
resp.status_code == expected_status_code, \
|
|
f'Expected {resp.status_code} to be {expected_status_code}.\nRequest:\n{json.dumps(q, indent=2)}\nResponse:\n{json.dumps(resp_obj, indent=2)}'
|
|
return resp_obj
|
|
|
|
def v1q(self, q, headers = {}, expected_status_code = 200):
|
|
return self.execute_query(q, "/v1/query", headers, expected_status_code)
|
|
|
|
def v1q_f(self, filepath, headers = {}, expected_status_code = 200):
|
|
with open(filepath) as f:
|
|
# NOTE: preserve ordering with ruamel
|
|
yml = yaml.YAML()
|
|
return self.v1q(yml.load(f), headers, expected_status_code)
|
|
|
|
def v2q(self, q, headers = {}, expected_status_code = 200):
|
|
return self.execute_query(q, "/v2/query", headers, expected_status_code)
|
|
|
|
def v2q_f(self, filepath, headers = {}, expected_status_code = 200):
|
|
with open(filepath) as f:
|
|
# NOTE: preserve ordering with ruamel
|
|
yml = yaml.YAML()
|
|
return self.v2q(yml.load(f), headers, expected_status_code)
|
|
|
|
def backend_suffix(self, filename):
|
|
if self.is_default_backend:
|
|
return filename
|
|
else:
|
|
return filename + "_" + self.backend
|
|
|
|
def v1metadataq(self, q, headers = {}, expected_status_code = 200):
|
|
return self.execute_query(q, "/v1/metadata", headers, expected_status_code)
|
|
|
|
def v1metadataq_f(self, filepath, headers = {}, expected_status_code = 200):
|
|
with open(filepath) as f:
|
|
# NOTE: preserve ordering with ruamel
|
|
yml = yaml.YAML()
|
|
return self.v1metadataq(yml.load(f), headers, expected_status_code)
|
|
|
|
def v1graphqlq(self, q, headers = {}, expected_status_code = 200):
|
|
return self.execute_query(q, "/v1/graphql", headers, expected_status_code)
|
|
|
|
def v1graphql_f(self, filepath, headers = {}, expected_status_code = 200):
|
|
with open(filepath) as f:
|
|
# NOTE: preserve ordering with ruamel
|
|
yml = yaml.YAML()
|
|
return self.v1graphqlq(yml.load(f), headers, expected_status_code)
|
|
|
|
def teardown(self):
|
|
self.http.close()
|
|
self.engine.dispose()
|
|
# Close websockets:
|
|
self.ws_client.teardown()
|
|
self.ws_client_v1alpha1.teardown()
|
|
self.ws_client_relay.teardown()
|
|
self.ws_client_graphql_ws.teardown()
|
|
|
|
def v1GraphqlExplain(self, q, headers = {}, expected_status_code = 200):
|
|
return self.execute_query(q, '/v1/graphql/explain', headers, expected_status_code)
|