graphql-engine/server/tests-py/test_events.py
Karthikeyan Chinnakonda 44347d2d74 server: template the schema and table names in the event trigger PG functions
Earlier, while creating the event trigger's internal postgres trigger, we used to get the name of the table from the `TG_TABLE_NAME` special trigger variable. Using this with normal tables works fine, but it breaks when the parent table is partitioned because we associate the ET configuration in the schema only with the original table (as it should be).

In this PR, we supply the table name and schema name through template variables instead of using `TG_TABLE_NAME` and `TG_TABLE_SCHEMA`, so that event triggers work with a partitioned table as well.

TODO:

- [x] Changelog
- [x] unit test (ET on partition table)

GitOrigin-RevId: 556376881a85525300dcf64da0611ee9ad387eb0
2021-01-06 20:22:34 +00:00

682 lines
24 KiB
Python

#!/usr/bin/env python3
import pytest
import queue
import time
from validate import check_query_f, check_event
usefixtures = pytest.mark.usefixtures
# Every test in this class requires the events webhook to be running first
# We are also going to mark as server upgrade tests are allowed
# A few tests are going to be excluded with skip_server_upgrade_test mark
pytestmark = [usefixtures('evts_webhook'), pytest.mark.allow_server_upgrade_test]
def select_last_event_fromdb(hge_ctx):
q = {
"type": "select",
"args": {
"table": {"schema": "hdb_catalog", "name": "event_log"},
"columns": ["*"],
"order_by": ["-created_at"],
"limit": 1
}
}
st_code, resp = hge_ctx.v1q(q)
return st_code, resp
def insert(hge_ctx, table, row, returning=[], headers = {}):
return insert_many(hge_ctx, table, [row], returning, headers)
def insert_many(hge_ctx, table, rows, returning=[], headers = {}):
q = {
"type": "insert",
"args": {
"table": table,
"objects": rows,
"returning": returning
}
}
st_code, resp = hge_ctx.v1q(q, headers = headers)
return st_code, resp
def update(hge_ctx, table, where_exp, set_exp, headers = {}):
q = {
"type": "update",
"args": {
"table": table,
"where": where_exp,
"$set": set_exp
}
}
st_code, resp = hge_ctx.v1q(q, headers = headers)
return st_code, resp
def delete(hge_ctx, table, where_exp, headers = {}):
q = {
"type": "delete",
"args": {
"table": table,
"where": where_exp
}
}
st_code, resp = hge_ctx.v1q(q, headers = headers)
return st_code, resp
@usefixtures("per_method_tests_db_state")
class TestCreateAndDelete:
def test_create_delete(self, hge_ctx):
check_query_f(hge_ctx, self.dir() + "/create_and_delete.yaml")
def test_create_reset(self, hge_ctx):
check_query_f(hge_ctx, self.dir() + "/create_and_reset.yaml")
# Can't run server upgrade tests, as this test has a schema change
@pytest.mark.skip_server_upgrade_test
def test_create_operation_spec_not_provider_err(self, hge_ctx):
check_query_f(hge_ctx, self.dir() + "/create_trigger_operation_specs_not_provided_err.yaml")
@classmethod
def dir(cls):
return 'queries/event_triggers/create-delete'
# Smoke test for handling a backlog of events
@usefixtures("per_method_tests_db_state")
class TestEventFlood(object):
@classmethod
def dir(cls):
return 'queries/event_triggers/basic'
def test_flood(self, hge_ctx, evts_webhook):
table = {"schema": "hge_tests", "name": "test_t1"}
payload = range(1,1001)
rows = list(map(lambda x: {"c1": x, "c2": "hello"}, payload))
st_code, resp = insert_many(hge_ctx, table, rows)
assert st_code == 200, resp
def get_evt():
# TODO ThreadedHTTPServer helps locally (I only need a timeout of
# 10 here), but we still need a bit of a long timeout here for CI
# it seems, since webhook can't keep up there:
ev_full = evts_webhook.get_event(600)
return ev_full['body']['event']['data']['new']['c1']
# Make sure we got all payloads (probably out of order):
ns = list(map(lambda _: get_evt(), payload))
ns.sort()
assert ns == list(payload)
@usefixtures("per_class_tests_db_state")
class TestCreateEvtQuery(object):
@classmethod
def dir(cls):
return 'queries/event_triggers/basic'
def test_basic(self, hge_ctx, evts_webhook):
table = {"schema": "hge_tests", "name": "test_t1"}
init_row = {"c1": 1, "c2": "hello"}
exp_ev_data = {
"old": None,
"new": init_row
}
st_code, resp = insert(hge_ctx, table, init_row)
assert st_code == 200, resp
check_event(hge_ctx, evts_webhook, "t1_all", table, "INSERT", exp_ev_data)
where_exp = {"c1": 1}
set_exp = {"c2": "world"}
exp_ev_data = {
"old": init_row,
"new": {"c1": 1, "c2": "world"}
}
st_code, resp = update(hge_ctx, table, where_exp, set_exp)
assert st_code == 200, resp
check_event(hge_ctx, evts_webhook, "t1_all", table, "UPDATE", exp_ev_data)
exp_ev_data = {
"old": {"c1": 1, "c2": "world"},
"new": None
}
st_code, resp = delete(hge_ctx, table, where_exp)
assert st_code == 200, resp
check_event(hge_ctx, evts_webhook, "t1_all", table, "DELETE", exp_ev_data)
def test_partitioned_table_basic_insert(self, hge_ctx, evts_webhook):
if hge_ctx.pg_version < 110000:
pytest.skip('Event triggers on partioned tables are not supported in Postgres versions < 11')
return
st_code, resp = hge_ctx.v1q_f(self.dir() + '/partition_table_setup.yaml')
assert st_code == 200, resp
table = { "schema":"hge_tests", "name": "measurement"}
init_row = { "city_id": 1, "logdate": "2006-02-02", "peaktemp": 1, "unitsales": 1}
exp_ev_data = {
"old": None,
"new": init_row
}
st_code, resp = insert(hge_ctx, table, init_row)
assert st_code == 200, resp
check_event(hge_ctx, evts_webhook, "measurement_all", table, "INSERT", exp_ev_data)
st_code, resp = hge_ctx.v1q_f(self.dir() + '/partition_table_teardown.yaml')
assert st_code == 200, resp
@usefixtures('per_method_tests_db_state')
class TestRetryConf(object):
@classmethod
def dir(cls):
return 'queries/event_triggers/retry_conf'
def test_basic(self, hge_ctx, evts_webhook):
table = {"schema": "hge_tests", "name": "test_t1"}
init_row = {"c1": 1, "c2": "hello"}
exp_ev_data = {
"old": None,
"new": init_row
}
st_code, resp = insert(hge_ctx, table, init_row)
assert st_code == 200, resp
time.sleep(15)
tries = evts_webhook.get_error_queue_size()
assert tries == 5, tries
def test_timeout_short(self, hge_ctx, evts_webhook):
table = {"schema": "hge_tests", "name": "test_t2"}
init_row = {"c1": 1, "c2": "hello"}
exp_ev_data = {
"old": None,
"new": init_row
}
st_code, resp = insert(hge_ctx, table, init_row)
assert st_code == 200, resp
time.sleep(20)
tries = evts_webhook.get_error_queue_size()
assert tries == 3, tries
def test_timeout_long(self, hge_ctx, evts_webhook):
table = {"schema": "hge_tests", "name": "test_t3"}
init_row = {"c1": 1, "c2": "hello"}
exp_ev_data = {
"old": None,
"new": init_row
}
st_code, resp = insert(hge_ctx, table, init_row)
assert st_code == 200, resp
time.sleep(15)
check_event(hge_ctx, evts_webhook, "t3_timeout_long", table, "INSERT", exp_ev_data, webhook_path = "/timeout_long")
@usefixtures('per_method_tests_db_state')
class TestEvtHeaders(object):
@classmethod
def dir(cls):
return 'queries/event_triggers/headers'
def test_basic(self, hge_ctx, evts_webhook):
table = {"schema": "hge_tests", "name": "test_t1"}
init_row = {"c1": 1, "c2": "hello"}
exp_ev_data = {
"old": None,
"new": init_row
}
headers = {"X-Header-From-Value": "MyValue", "X-Header-From-Env": "MyEnvValue"}
st_code, resp = insert(hge_ctx, table, init_row)
assert st_code == 200, resp
check_event(hge_ctx, evts_webhook, "t1_all", table, "INSERT", exp_ev_data, headers = headers)
class TestUpdateEvtQuery(object):
@pytest.fixture(autouse=True)
def transact(self, request, hge_ctx, evts_webhook):
print("In setup method")
st_code, resp = hge_ctx.v1q_f('queries/event_triggers/update_query/create-setup.yaml')
assert st_code == 200, resp
st_code, resp = hge_ctx.v1q_f('queries/event_triggers/update_query/update-setup.yaml')
assert st_code == 200, '{}'.format(resp)
assert resp[1]["sources"][0]["tables"][0]["event_triggers"][0]["webhook"] == 'http://127.0.0.1:5592/new'
yield
st_code, resp = hge_ctx.v1q_f('queries/event_triggers/update_query/teardown.yaml')
assert st_code == 200, resp
def test_update_basic(self, hge_ctx, evts_webhook):
table = {"schema": "hge_tests", "name": "test_t1"}
init_row = {"c1": 1, "c2": "hello", "c3": {"name": "clarke"}}
exp_ev_data = {
"old": None,
"new": {"c1": 1, "c2": "hello", "c3": {"name": "clarke"}}
}
st_code, resp = insert(hge_ctx, table, init_row)
assert st_code == 200, resp
with pytest.raises(queue.Empty):
check_event(hge_ctx, evts_webhook, "t1_cols", table, "INSERT", exp_ev_data, webhook_path = "/new")
where_exp = {"c1": 1}
set_exp = {"c2": "world"}
# expected no event hence previous expected data
st_code, resp = update(hge_ctx, table, where_exp, set_exp)
assert st_code == 200, resp
with pytest.raises(queue.Empty):
check_event(hge_ctx, evts_webhook, "t1_cols", table, "UPDATE", exp_ev_data, webhook_path = "/new")
where_exp = {"c1": 1}
set_exp = {"c3": {"name": "bellamy"}}
exp_ev_data = {
"old": {"c1": 1, "c2": "world", "c3": {"name": "clarke"}},
"new": {"c1": 1, "c2": "world", "c3": {"name": "bellamy"}}
}
st_code, resp = update(hge_ctx, table, where_exp, set_exp)
assert st_code == 200, resp
check_event(hge_ctx, evts_webhook, "t1_cols", table, "UPDATE", exp_ev_data, webhook_path ="/new")
where_exp = {"c1": 1}
set_exp = {"c1": 2}
exp_ev_data = {
"old": {"c1": 1, "c2": "world", "c3": {"name": "bellamy"}},
"new": {"c1": 2, "c2": "world", "c3": {"name": "bellamy"}}
}
st_code, resp = update(hge_ctx, table, where_exp, set_exp)
assert st_code == 200, resp
check_event(hge_ctx, evts_webhook, "t1_cols", table, "UPDATE", exp_ev_data, webhook_path ="/new")
where_exp = {"c1": 2}
exp_ev_data = {
"old": {"c1": 2, "c2": "world", "c3": {"name": "bellamy"}},
"new": None
}
st_code, resp = delete(hge_ctx, table, where_exp)
assert st_code == 200, resp
check_event(hge_ctx, evts_webhook, "t1_cols", table, "DELETE", exp_ev_data, webhook_path = "/new")
@usefixtures('per_method_tests_db_state')
class TestDeleteEvtQuery(object):
directory = 'queries/event_triggers'
setup_files = [
directory + '/basic/setup.yaml',
directory + '/delete_query/setup.yaml'
]
teardown_files = [ directory + '/delete_query/teardown.yaml']
def test_delete_basic(self, hge_ctx, evts_webhook):
table = {"schema": "hge_tests", "name": "test_t1"}
init_row = {"c1": 1, "c2": "hello"}
exp_ev_data = {
"old": None,
"new": init_row
}
st_code, resp = insert(hge_ctx, table, init_row)
assert st_code == 200, resp
with pytest.raises(queue.Empty):
check_event(hge_ctx, evts_webhook, "t1_all", table, "INSERT", exp_ev_data)
where_exp = {"c1": 1}
set_exp = {"c2": "world"}
exp_ev_data = {
"old": init_row,
"new": {"c1": 1, "c2": "world"}
}
st_code, resp = update(hge_ctx, table, where_exp, set_exp)
assert st_code == 200, resp
with pytest.raises(queue.Empty):
check_event(hge_ctx, evts_webhook, "t1_all", table, "UPDATE", exp_ev_data)
exp_ev_data = {
"old": {"c1": 1, "c2": "world"},
"new": None
}
st_code, resp = delete(hge_ctx, table, where_exp)
assert st_code == 200, resp
with pytest.raises(queue.Empty):
check_event(hge_ctx, evts_webhook, "t1_all", table, "DELETE", exp_ev_data)
@usefixtures('per_class_tests_db_state')
class TestEvtSelCols:
@classmethod
def dir(cls):
return 'queries/event_triggers/selected_cols'
def test_selected_cols(self, hge_ctx, evts_webhook):
table = {"schema": "hge_tests", "name": "test_t1"}
init_row = {"c1": 1, "c2": "hello"}
exp_ev_data = {
"old": None,
"new": {"c1": 1, "c2": "hello"}
}
st_code, resp = insert(hge_ctx, table, init_row)
assert st_code == 200, resp
check_event(hge_ctx, evts_webhook, "t1_cols", table, "INSERT", exp_ev_data)
where_exp = {"c1": 1}
set_exp = {"c2": "world"}
# expected no event hence previous expected data
st_code, resp = update(hge_ctx, table, where_exp, set_exp)
assert st_code == 200, resp
with pytest.raises(queue.Empty):
check_event(hge_ctx, evts_webhook, "t1_cols", table, "UPDATE", exp_ev_data)
where_exp = {"c1": 1}
set_exp = {"c1": 2}
exp_ev_data = {
"old": {"c1": 1, "c2": "world"},
"new": {"c1": 2, "c2": "world"}
}
st_code, resp = update(hge_ctx, table, where_exp, set_exp)
assert st_code == 200, resp
check_event(hge_ctx, evts_webhook, "t1_cols", table, "UPDATE", exp_ev_data)
where_exp = {"c1": 2}
exp_ev_data = {
"old": {"c1": 2, "c2": "world"},
"new": None
}
st_code, resp = delete(hge_ctx, table, where_exp)
assert st_code == 200, resp
check_event(hge_ctx, evts_webhook, "t1_cols", table, "DELETE", exp_ev_data)
@pytest.mark.skip_server_upgrade_test
def test_selected_cols_dep(self, hge_ctx, evts_webhook):
st_code, resp = hge_ctx.v1q({
"type": "run_sql",
"args": {
"sql": "alter table hge_tests.test_t1 drop column c1"
}
})
assert st_code == 400, resp
assert resp['code'] == "dependency-error", resp
st_code, resp = hge_ctx.v1q({
"type": "run_sql",
"args": {
"sql": "alter table hge_tests.test_t1 drop column c2"
}
})
assert st_code == 200, resp
@usefixtures('per_method_tests_db_state')
class TestEvtInsertOnly:
@classmethod
def dir(cls):
return 'queries/event_triggers/insert_only'
def test_insert_only(self, hge_ctx, evts_webhook):
table = {"schema": "hge_tests", "name": "test_t1"}
init_row = {"c1": 1, "c2": "hello"}
exp_ev_data = {
"old": None,
"new": init_row
}
st_code, resp = insert(hge_ctx, table, init_row)
assert st_code == 200, resp
check_event(hge_ctx, evts_webhook, "t1_insert", table, "INSERT", exp_ev_data)
where_exp = {"c1": 1}
set_exp = {"c2": "world"}
exp_ev_data = {
"old": init_row,
"new": {"c1": 1, "c2": "world"}
}
st_code, resp = update(hge_ctx, table, where_exp, set_exp)
assert st_code == 200, resp
with pytest.raises(queue.Empty):
check_event(hge_ctx, evts_webhook, "t1_insert", table, "UPDATE", exp_ev_data)
exp_ev_data = {
"old": {"c1": 1, "c2": "world"},
"new": None
}
st_code, resp = delete(hge_ctx, table, where_exp)
assert st_code == 200, resp
with pytest.raises(queue.Empty):
check_event(hge_ctx, evts_webhook, "t1_insert", table, "DELETE", exp_ev_data)
@usefixtures('per_class_tests_db_state')
class TestEvtSelPayload:
@classmethod
def dir(cls):
return 'queries/event_triggers/selected_payload'
def test_selected_payload(self, hge_ctx, evts_webhook):
table = {"schema": "hge_tests", "name": "test_t1"}
init_row = {"c1": 1, "c2": "hello"}
exp_ev_data = {
"old": None,
"new": {"c1": 1, "c2": "hello"}
}
st_code, resp = insert(hge_ctx, table, init_row)
assert st_code == 200, resp
check_event(hge_ctx, evts_webhook, "t1_payload", table, "INSERT", exp_ev_data)
where_exp = {"c1": 1}
set_exp = {"c2": "world"}
exp_ev_data = {
"old": {"c1": 1},
"new": {"c1": 1}
}
st_code, resp = update(hge_ctx, table, where_exp, set_exp)
assert st_code == 200, resp
check_event(hge_ctx, evts_webhook, "t1_payload", table, "UPDATE", exp_ev_data)
where_exp = {"c1": 1}
set_exp = {"c1": 2}
exp_ev_data = {
"old": {"c1": 1},
"new": {"c1": 2}
}
st_code, resp = update(hge_ctx, table, where_exp, set_exp)
assert st_code == 200, resp
check_event(hge_ctx, evts_webhook, "t1_payload", table, "UPDATE", exp_ev_data)
where_exp = {"c1": 2}
exp_ev_data = {
"old": {"c2": "world"},
"new": None
}
st_code, resp = delete(hge_ctx, table, where_exp)
assert st_code == 200, resp
check_event(hge_ctx, evts_webhook, "t1_payload", table, "DELETE", exp_ev_data)
def test_selected_payload_dep(self, hge_ctx):
st_code, resp = hge_ctx.v1q({
"type": "run_sql",
"args": {
"sql": "alter table hge_tests.test_t1 drop column c1"
}
})
assert st_code == 400, resp
assert resp['code'] == "dependency-error", resp
st_code, resp = hge_ctx.v1q({
"type": "run_sql",
"args": {
"sql": "alter table hge_tests.test_t1 drop column c2"
}
})
assert st_code == 400, resp
assert resp['code'] == "dependency-error", resp
@usefixtures('per_method_tests_db_state')
class TestWebhookEnv(object):
@classmethod
def dir(cls):
return 'queries/event_triggers/webhook_env'
def test_basic(self, hge_ctx, evts_webhook):
table = {"schema": "hge_tests", "name": "test_t1"}
init_row = {"c1": 1, "c2": "hello"}
exp_ev_data = {
"old": None,
"new": init_row
}
st_code, resp = insert(hge_ctx, table, init_row)
assert st_code == 200, resp
check_event(hge_ctx, evts_webhook, "t1_all", table, "INSERT", exp_ev_data)
where_exp = {"c1": 1}
set_exp = {"c2": "world"}
exp_ev_data = {
"old": init_row,
"new": {"c1": 1, "c2": "world"}
}
st_code, resp = update(hge_ctx, table, where_exp, set_exp)
assert st_code == 200, resp
check_event(hge_ctx, evts_webhook, "t1_all", table, "UPDATE", exp_ev_data)
exp_ev_data = {
"old": {"c1": 1, "c2": "world"},
"new": None
}
st_code, resp = delete(hge_ctx, table, where_exp)
assert st_code == 200, resp
check_event(hge_ctx, evts_webhook, "t1_all", table, "DELETE", exp_ev_data)
@usefixtures('per_method_tests_db_state')
class TestWebhookTemplateURL(object):
@classmethod
def dir(cls):
return 'queries/event_triggers/webhook_template_url'
def test_basic(self, hge_ctx, evts_webhook):
table = {"schema": "hge_tests", "name": "test_t1"}
init_row = {"c1": 1, "c2": "hello"}
exp_ev_data = {
"old": None,
"new": init_row
}
st_code, resp = insert(hge_ctx, table, init_row)
assert st_code == 200, resp
check_event(hge_ctx, evts_webhook, "t1_all", table, "INSERT", exp_ev_data, webhook_path = '/trigger')
where_exp = {"c1": 1}
set_exp = {"c2": "world"}
exp_ev_data = {
"old": init_row,
"new": {"c1": 1, "c2": "world"}
}
st_code, resp = update(hge_ctx, table, where_exp, set_exp)
assert st_code == 200, resp
check_event(hge_ctx, evts_webhook, "t1_all", table, "UPDATE", exp_ev_data, webhook_path = '/trigger')
exp_ev_data = {
"old": {"c1": 1, "c2": "world"},
"new": None
}
st_code, resp = delete(hge_ctx, table, where_exp)
assert st_code == 200, resp
check_event(hge_ctx, evts_webhook, "t1_all", table, "DELETE", exp_ev_data, webhook_path = '/trigger')
@usefixtures('per_method_tests_db_state')
class TestSessionVariables(object):
@classmethod
def dir(cls):
return 'queries/event_triggers/basic'
def test_basic(self, hge_ctx, evts_webhook):
table = {"schema": "hge_tests", "name": "test_t1"}
init_row = {"c1": 1, "c2": "hello"}
exp_ev_data = {
"old": None,
"new": init_row
}
session_variables = { 'x-hasura-role': 'admin', 'x-hasura-allowed-roles': "['admin','user']", 'x-hasura-user-id': '1'}
st_code, resp = insert(hge_ctx, table, init_row, headers = session_variables)
assert st_code == 200, resp
check_event(hge_ctx, evts_webhook, "t1_all", table, "INSERT", exp_ev_data, session_variables = session_variables)
where_exp = {"c1": 1}
set_exp = {"c2": "world"}
exp_ev_data = {
"old": init_row,
"new": {"c1": 1, "c2": "world"}
}
session_variables = { 'x-hasura-role': 'admin', 'x-hasura-random': 'some_random_info', 'X-Random-Header': 'not_session_variable'}
st_code, resp = update(hge_ctx, table, where_exp, set_exp, headers = session_variables)
assert st_code == 200, resp
session_variables.pop('X-Random-Header')
check_event(hge_ctx, evts_webhook, "t1_all", table, "UPDATE", exp_ev_data, session_variables = session_variables)
exp_ev_data = {
"old": {"c1": 1, "c2": "world"},
"new": None
}
st_code, resp = delete(hge_ctx, table, where_exp)
assert st_code == 200, resp
check_event(hge_ctx, evts_webhook, "t1_all", table, "DELETE", exp_ev_data)
@usefixtures('per_method_tests_db_state')
class TestManualEvents(object):
@classmethod
def dir(cls):
return 'queries/event_triggers/manual_events'
def test_basic(self, hge_ctx, evts_webhook):
st_code, resp = hge_ctx.v1q_f('queries/event_triggers/manual_events/enabled.yaml')
assert st_code == 200, resp
st_code, resp = hge_ctx.v1q_f('queries/event_triggers/manual_events/disabled.yaml')
assert st_code == 400, resp
@usefixtures('per_method_tests_db_state')
class TestEventsAsynchronousExecution(object):
@classmethod
def dir(cls):
return 'queries/event_triggers/async_execution'
def test_async_execution(self,hge_ctx,evts_webhook):
"""
A test to check if the events generated by the graphql-engine are
processed asynchronously. This test measures the time taken to process
all the events and that time should definitely be lesser than the time
taken if the events were to be executed sequentially.
This test inserts 5 rows and the webhook(/timeout_long) takes
~5 seconds to process one request. So, if the graphql-engine
were to process the events sequentially it will take 5 * 5 = 25 seconds.
Theorotically, all the events should have been processed in ~5 seconds,
adding a 5 seconds buffer to the comparision, so that this test
doesn't flake in the CI.
"""
table = {"schema": "hge_tests", "name": "test_t1"}
payload = range(1,6)
rows = list(map(lambda x: {"c1": x, "c2": "hello"}, payload))
st_code, resp = insert_many(hge_ctx, table, rows)
start_time = time.perf_counter()
assert st_code == 200, resp
for i in range(1,6):
_ = evts_webhook.get_event(7) # webhook takes 5 seconds to process a request
end_time = time.perf_counter()
time_elapsed = end_time - start_time
assert time_elapsed < 10