mirror of
https://github.com/hasura/graphql-engine.git
synced 2025-01-07 08:13:18 +03:00
204ec89c61
This rewrites the last couple of Python tests that were failing when run with a separate HGE binary per test class. The changes are as follows: 1. The event triggers tests, naming conventions tests, and subscriptions tests all generate a new source DB per test, so can run in parallel. 2. The scheduled triggers tests use the correct URL for the trigger service when the port is generated randomly. 3. Whitespace and trailing commas are added to the scheduled triggers tests. 4. Support for SQL Server is added to _hge.py_ so the naming conventions test that runs on SQL Server passes. (The other SQL Server tests do not pass and we're not going to bother with them for now.) 5. Container names are fixed in _run.sh_. 6. _run.sh_ and _run-new.sh_ don't pull images explicitly as it's annoying when running tests a lot. If you want to pull the latest versions, just run `docker compose pull` from the _server/tests-py_ directory, or the root directory. (If you don't have the images at all, they'll still be pulled automatically.) PR-URL: https://github.com/hasura/graphql-engine-mono/pull/7350 GitOrigin-RevId: db58f310f017b2a0884fcf61ccc56d15583f99bd
401 lines
16 KiB
Python
401 lines
16 KiB
Python
from croniter import croniter
|
|
from datetime import datetime, timedelta
|
|
import itertools
|
|
import json
|
|
import sqlalchemy
|
|
|
|
from validate import validate_event_headers, validate_event_webhook
|
|
from utils import until_asserts_pass
|
|
|
|
# The create and delete tests should ideally go in setup and teardown YAML files,
|
|
# We can't use that here because, the payload is dynamic i.e. in case of one-off scheduled events
|
|
# the value is the current timestamp and in case of cron Triggers, the cron schedule is
|
|
# derived based on the current timestamp
|
|
|
|
def stringify_datetime(dt):
|
|
return dt.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
|
|
class TestScheduledEvent(object):
|
|
|
|
@classmethod
|
|
def dir(cls):
|
|
return 'queries/scheduled_triggers'
|
|
|
|
webhook_payload = {"foo": "baz"}
|
|
|
|
header_conf = [
|
|
{
|
|
"name": "header-key",
|
|
"value": "header-value"
|
|
}
|
|
]
|
|
|
|
def test_scheduled_events(self, hge_ctx, scheduled_triggers_evts_webhook, metadata_schema_url):
|
|
metadata_engine = sqlalchemy.engine.create_engine(metadata_schema_url)
|
|
|
|
query = {
|
|
"type": "bulk",
|
|
"args": [
|
|
# Succeeds
|
|
{
|
|
"type": "create_scheduled_event",
|
|
"args": {
|
|
"webhook": f'{scheduled_triggers_evts_webhook.url}/test',
|
|
"schedule_at": stringify_datetime(datetime.utcnow()),
|
|
"payload": self.webhook_payload,
|
|
"headers": self.header_conf,
|
|
"comment": "test scheduled event",
|
|
},
|
|
},
|
|
# Fails immediately, with 'dead'
|
|
{
|
|
"type": "create_scheduled_event",
|
|
"args": {
|
|
"webhook": f'{scheduled_triggers_evts_webhook.url}/',
|
|
"schedule_at": "2020-01-01T00:00:00Z",
|
|
"payload": self.webhook_payload,
|
|
"headers": self.header_conf,
|
|
},
|
|
},
|
|
# Fails on request, trying twice:
|
|
{
|
|
"type": "create_scheduled_event",
|
|
"args": {
|
|
"webhook": f'{scheduled_triggers_evts_webhook.url}/fail',
|
|
"schedule_at": stringify_datetime(datetime.utcnow()),
|
|
"payload": self.webhook_payload,
|
|
"headers": self.header_conf,
|
|
"retry_conf": {
|
|
"num_retries": 1,
|
|
"retry_interval_seconds": 1,
|
|
"timeout_seconds": 1,
|
|
"tolerance_seconds": 21600,
|
|
},
|
|
},
|
|
},
|
|
],
|
|
}
|
|
resp = hge_ctx.v1q(query)
|
|
assert len(resp) == 3, resp
|
|
# ensuring that valid event_id is returned for all requests
|
|
assert all(['event_id' in r for r in resp]), resp
|
|
|
|
# Here we check the three requests received by the webhook.
|
|
# Collect the three generated events (they may arrive out of order):
|
|
e1 = scheduled_triggers_evts_webhook.get_event(12) # at least 10 sec, see processScheduledTriggers.sleep
|
|
e2 = scheduled_triggers_evts_webhook.get_event(12)
|
|
e3 = scheduled_triggers_evts_webhook.get_event(12)
|
|
[event_fail1, event_fail2, event_success] = sorted([e1,e2,e3], key=lambda e: e['path'])
|
|
# Check the two failures:
|
|
validate_event_webhook(event_fail1['path'],'/fail')
|
|
validate_event_webhook(event_fail2['path'],'/fail')
|
|
|
|
# Check the one successful webhook call:
|
|
with metadata_engine.connect() as connection:
|
|
query = '''
|
|
select to_json(timezone('utc', created_at)) as created_at
|
|
from hdb_catalog.hdb_scheduled_events
|
|
where comment = 'test scheduled event'
|
|
'''
|
|
result = connection.execute(query).fetchone()
|
|
assert result is not None
|
|
db_created_at = result['created_at']
|
|
|
|
validate_event_webhook(event_success['path'], '/test')
|
|
validate_event_headers(event_success['headers'], {"header-key": "header-value"})
|
|
assert event_success['body']['payload'] == self.webhook_payload
|
|
assert event_success['body']['created_at'] == db_created_at.replace(" ","T") + "Z"
|
|
payload_keys = dict.keys(event_success['body'])
|
|
for k in ["scheduled_time","created_at","id"]: # additional keys
|
|
assert k in payload_keys
|
|
assert scheduled_triggers_evts_webhook.is_queue_empty()
|
|
|
|
def try_check_events_statuses():
|
|
with metadata_engine.connect() as connection:
|
|
scheduled_event_statuses = list(
|
|
connection.execute(
|
|
"select status, tries from hdb_catalog.hdb_scheduled_events order by status desc"
|
|
).fetchall()
|
|
)
|
|
# 3 scheduled events have been created
|
|
# one should be dead because the timestamp was past the tolerance limit
|
|
# one should be delivered because all the parameters were reasonable
|
|
# one should be error because the webhook returns an error state
|
|
assert scheduled_event_statuses == [
|
|
# status tries
|
|
( 'error', 2), # num_retries + 1
|
|
( 'delivered', 1),
|
|
( 'dead', 0),
|
|
]
|
|
|
|
until_asserts_pass(100, try_check_events_statuses)
|
|
|
|
# WARNING: The tests in this class are not independent; they depend on the side effects of previous tests.
|
|
class TestCronTrigger(object):
|
|
|
|
cron_trigger_name = "cron_trigger"
|
|
# setting the test to be after 30 mins, to make sure that
|
|
# any of the events are not delivered.
|
|
min_after_30_mins = (datetime.utcnow() + timedelta(minutes=30)).minute
|
|
cron_schedule = "{} * * * *".format(min_after_30_mins)
|
|
init_time = datetime.utcnow()
|
|
|
|
def test_create_cron_schedule_triggers(self, hge_ctx, scheduled_triggers_evts_webhook):
|
|
cron_st_api_query = {
|
|
"type": "create_cron_trigger",
|
|
"args": {
|
|
"name": self.cron_trigger_name,
|
|
"webhook": f"{scheduled_triggers_evts_webhook.url}/foo",
|
|
"schedule": self.cron_schedule,
|
|
"headers": [
|
|
{
|
|
"name": "foo",
|
|
"value": "baz",
|
|
},
|
|
],
|
|
"payload": {"foo": "baz"},
|
|
"include_in_metadata": True,
|
|
},
|
|
}
|
|
resp = hge_ctx.v1q(cron_st_api_query)
|
|
# the cron events will be generated based on the current time, they
|
|
# will not be exactly the same though(the server now and now here)
|
|
assert resp['message'] == 'success'
|
|
|
|
def test_check_generated_cron_scheduled_events(self, metadata_schema_url):
|
|
metadata_engine = sqlalchemy.engine.create_engine(metadata_schema_url)
|
|
|
|
schedule = croniter(self.cron_schedule, self.init_time)
|
|
expected_scheduled_timestamps = list(itertools.islice(schedule.all_next(datetime), 100))
|
|
self.verify_timestamps(metadata_engine, expected_scheduled_timestamps)
|
|
|
|
def test_update_existing_cron_trigger(self ,hge_ctx, metadata_schema_url, scheduled_triggers_evts_webhook):
|
|
metadata_engine = sqlalchemy.engine.create_engine(metadata_schema_url)
|
|
|
|
expected_scheduled_timestamps = []
|
|
iter = croniter(self.cron_schedule,datetime.utcnow())
|
|
for _ in range(100):
|
|
expected_scheduled_timestamps.append(iter.next(datetime))
|
|
q = {
|
|
"type": "create_cron_trigger",
|
|
"args": {
|
|
"name": self.cron_trigger_name,
|
|
"webhook": f"{scheduled_triggers_evts_webhook.url}/foo",
|
|
"schedule": self.cron_schedule,
|
|
"headers": [
|
|
{
|
|
"name": "header-name",
|
|
"value": "header-value",
|
|
},
|
|
],
|
|
"payload": {"foo": "baz"},
|
|
"include_in_metadata": True,
|
|
"replace": True,
|
|
},
|
|
}
|
|
hge_ctx.v1q(q)
|
|
|
|
resp = hge_ctx.v1q({'type': 'export_metadata', 'args': {}})
|
|
|
|
all_cron_triggers = resp['cron_triggers']
|
|
for cron_trigger in all_cron_triggers:
|
|
if cron_trigger['name'] == self.cron_trigger_name:
|
|
assert cron_trigger['headers'] == [{
|
|
"name": "header-name",
|
|
"value": "header-value",
|
|
}]
|
|
|
|
# After updating the cron trigger, the future events should have been created
|
|
self.verify_timestamps(metadata_engine, expected_scheduled_timestamps)
|
|
|
|
def test_check_fired_webhook_event(self, hge_ctx, scheduled_triggers_evts_webhook):
|
|
q = {
|
|
"type": "create_cron_trigger",
|
|
"args": {
|
|
"name": "test_cron_trigger",
|
|
"webhook": f"{scheduled_triggers_evts_webhook.url}/test",
|
|
"schedule": "* * * * *",
|
|
"headers": [
|
|
{
|
|
"name": "header-key",
|
|
"value": "header-value",
|
|
},
|
|
],
|
|
"payload": {"foo": "baz"},
|
|
"include_in_metadata": False,
|
|
},
|
|
}
|
|
hge_ctx.v1q(q)
|
|
# The maximum timeout is set to 75s because, the cron timestamps
|
|
# that are generated will start from the next minute, suppose
|
|
# the cron schedule is "* * * * *" and the time the cron trigger
|
|
# is created is 10:00:00, then the next event will be scheduled
|
|
# at 10:01:00, but the events processor will not process it
|
|
# exactly at the zeroeth second of 10:01. The only guarantee
|
|
# is that, the event processor will start to process the event before
|
|
# 10:01:10 (seel sleep in processScheduledTriggers). So, in the worst
|
|
# case, it will take 70 seconds to process the first scheduled event.
|
|
event = scheduled_triggers_evts_webhook.get_event(75)
|
|
validate_event_webhook(event['path'], '/test')
|
|
validate_event_headers(event['headers'], {"header-key":"header-value"})
|
|
assert event['body']['payload'] == {"foo": "baz"}
|
|
assert event['body']['name'] == 'test_cron_trigger'
|
|
|
|
def test_get_cron_triggers(self, hge_ctx, scheduled_triggers_evts_webhook):
|
|
q = {
|
|
"type": "get_cron_triggers",
|
|
"args": {}
|
|
}
|
|
resp = hge_ctx.v1metadataq(q)
|
|
respDict = json.loads(json.dumps(resp))
|
|
assert respDict['cron_triggers'] == [
|
|
{
|
|
"headers": [
|
|
{
|
|
"name": "header-name",
|
|
"value": "header-value",
|
|
}
|
|
],
|
|
"include_in_metadata": True,
|
|
"name": self.cron_trigger_name,
|
|
"payload": {
|
|
"foo": "baz",
|
|
},
|
|
"retry_conf": {
|
|
"num_retries": 0,
|
|
"retry_interval_seconds": 10,
|
|
"timeout_seconds": 60,
|
|
"tolerance_seconds": 21600,
|
|
},
|
|
"schedule": self.cron_schedule,
|
|
"webhook": f"{scheduled_triggers_evts_webhook.url}/foo",
|
|
},
|
|
{
|
|
"headers": [
|
|
{
|
|
"name": "header-key",
|
|
"value": "header-value",
|
|
},
|
|
],
|
|
"include_in_metadata": False,
|
|
"name": "test_cron_trigger",
|
|
"payload": {
|
|
"foo": "baz",
|
|
},
|
|
"retry_conf": {
|
|
"num_retries": 0,
|
|
"retry_interval_seconds": 10,
|
|
"timeout_seconds": 60,
|
|
"tolerance_seconds": 21600,
|
|
},
|
|
"schedule": "* * * * *",
|
|
"webhook": f"{scheduled_triggers_evts_webhook.url}/test",
|
|
},
|
|
]
|
|
|
|
|
|
def test_export_and_import_cron_triggers(self, hge_ctx, metadata_schema_url, scheduled_triggers_evts_webhook):
|
|
metadata_engine = sqlalchemy.engine.create_engine(metadata_schema_url)
|
|
|
|
q = {
|
|
"type": "export_metadata",
|
|
"args": {}
|
|
}
|
|
resp = hge_ctx.v1q(q)
|
|
respDict = json.loads(json.dumps(resp))
|
|
# Only the cron triggers with `include_in_metadata` set to `True`
|
|
# should be exported
|
|
assert respDict['cron_triggers'] == [
|
|
{
|
|
"headers": [
|
|
{
|
|
"name": "header-name",
|
|
"value": "header-value",
|
|
}
|
|
],
|
|
"include_in_metadata": True,
|
|
"name": self.cron_trigger_name,
|
|
"payload": {
|
|
"foo": "baz"
|
|
},
|
|
"schedule": self.cron_schedule,
|
|
"webhook": f"{scheduled_triggers_evts_webhook.url}/foo",
|
|
},
|
|
]
|
|
q = {
|
|
"type": "replace_metadata",
|
|
"args": {
|
|
"metadata": resp,
|
|
},
|
|
}
|
|
resp = hge_ctx.v1q(q)
|
|
|
|
with metadata_engine.connect() as connection:
|
|
sql = '''
|
|
select count(1) as count
|
|
from hdb_catalog.hdb_cron_events
|
|
where trigger_name = %s
|
|
'''
|
|
result = connection.execute(sql, (self.cron_trigger_name,)).fetchone()
|
|
assert result is not None
|
|
count = result['count']
|
|
# Check if the future cron events are created for
|
|
# for a cron trigger while imported from the metadata
|
|
assert int(count) == 100
|
|
|
|
def test_attempt_to_create_duplicate_cron_trigger_fail(self, hge_ctx, scheduled_triggers_evts_webhook):
|
|
q = {
|
|
"type": "create_cron_trigger",
|
|
"args": {
|
|
"name": "test_cron_trigger",
|
|
"webhook": f"{scheduled_triggers_evts_webhook.url}/test",
|
|
"schedule": "* * * * *",
|
|
"headers": [
|
|
{
|
|
"name": "header-key",
|
|
"value": "header-value",
|
|
},
|
|
],
|
|
"payload": {"foo": "baz"},
|
|
"include_in_metadata": False,
|
|
},
|
|
}
|
|
resp = hge_ctx.v1q(q, expected_status_code = 400)
|
|
assert dict(resp) == {
|
|
"code": "already-exists",
|
|
"error": 'cron trigger with name: test_cron_trigger already exists',
|
|
"path": "$.args"
|
|
}
|
|
|
|
def test_delete_cron_scheduled_trigger(self,hge_ctx):
|
|
q = {
|
|
"type": "bulk",
|
|
"args": [
|
|
{
|
|
"type": "delete_cron_trigger",
|
|
"args": {
|
|
"name": self.cron_trigger_name,
|
|
},
|
|
},
|
|
{
|
|
"type": "delete_cron_trigger",
|
|
"args": {
|
|
"name": "test_cron_trigger",
|
|
},
|
|
},
|
|
],
|
|
}
|
|
hge_ctx.v1q(q)
|
|
|
|
def verify_timestamps(self, metadata_engine, expected_scheduled_timestamps):
|
|
# Get timestamps in UTC from the db to compare them with the croniter-generated timestamps
|
|
with metadata_engine.connect() as connection:
|
|
sql = '''
|
|
select timezone('utc', scheduled_time) as scheduled_time
|
|
from hdb_catalog.hdb_cron_events
|
|
where trigger_name = %s
|
|
order by scheduled_time asc
|
|
'''
|
|
actual_scheduled_timestamps = list(scheduled_time for (scheduled_time,) in connection.execute(sql, (self.cron_trigger_name,)).fetchall())
|
|
|
|
assert actual_scheduled_timestamps == expected_scheduled_timestamps
|