add test for checking the value of created_at

This commit is contained in:
Karthikeyan Chinnakonda 2020-09-07 13:07:53 +05:30
parent ee6797a8d8
commit b548a8e008

View File

@ -39,7 +39,8 @@ class TestScheduledEvent(object):
"webhook":'{{SCHEDULED_TRIGGERS_WEBHOOK_DOMAIN}}/test',
"schedule_at":stringify_datetime(datetime.utcnow()),
"payload":self.webhook_payload,
"headers":self.header_conf
"headers":self.header_conf,
"comment":"test scheduled event"
}
}
st, resp = hge_ctx.v1q(query)
@ -78,10 +79,24 @@ class TestScheduledEvent(object):
assert st == 200, resp
def test_check_fired_webhook_event(self,hge_ctx,scheduled_triggers_evts_webhook):
query = {
"type":"run_sql",
"args":{
"sql":'''
select timezone('utc',created_at) as created_at
from hdb_catalog.hdb_scheduled_events
where comment = 'test scheduled event';
'''
}
}
st, resp = hge_ctx.v1q(query)
assert st == 200, resp
db_created_at = resp['result'][1][0]
event = scheduled_triggers_evts_webhook.get_event(65)
validate_event_webhook(event['path'],'/test')
validate_event_headers(event['headers'],{"header-key":"header-value"})
assert event['body']['payload'] == self.webhook_payload
assert event['body']['created_at'] == db_created_at.replace(" ","T") + "Z"
payload_keys = dict.keys(event['body'])
for k in ["scheduled_time","created_at","id"]: # additional keys
assert k in payload_keys
@ -215,7 +230,6 @@ class TestCronTrigger(object):
}
st,resp = hge_ctx.v1q(q)
assert st == 200,resp
print ("resp",resp['result'][1][0])
assert json.loads(resp['result'][1][0]) == [{
"name":"header-name",
"value":"header-value"
@ -245,6 +259,45 @@ class TestCronTrigger(object):
actual_schedule_timestamps.append(datetime_ts)
assert actual_schedule_timestamps == expected_schedule_timestamps
def test_check_fired_webhook_event(self, hge_ctx, scheduled_triggers_evts_webhook):
q = {
"type":"create_cron_trigger",
"args":{
"name":"test_cron_trigger",
"webhook":"{{SCHEDULED_TRIGGERS_WEBHOOK_DOMAIN}}" + "/test",
"schedule":"* * * * *",
"headers":[
{
"name":"header-key",
"value":"header-value"
}
],
"payload":{"foo":"baz"},
"include_in_metadata":False
}
}
st,resp = hge_ctx.v1q(q)
assert st == 200, resp
query = {
"type":"run_sql",
"args":{
"sql":'''
select timezone('utc',created_at) as created_at
from hdb_catalog.hdb_cron_triggers
where name = 'test_cron_trigger'
'''
}
}
st, resp = hge_ctx.v1q(query)
assert st == 200, resp
db_created_at = resp['result'][1][0]
event = scheduled_triggers_evts_webhook.get_event(60)
validate_event_webhook(event['path'],'/test')
validate_event_headers(event['headers'],{"header-key":"header-value"})
assert event['body']['payload'] == {"foo":"baz"}
assert event['body']['created_at'] == db_created_at.replace(" ","T") + "Z"
assert event['body']['name'] == 'test_cron_trigger'
def test_delete_cron_scheduled_trigger(self,hge_ctx):
q = {
"type":"delete_cron_trigger",
@ -254,3 +307,11 @@ class TestCronTrigger(object):
}
st,resp = hge_ctx.v1q(q)
assert st == 200,resp
q = {
"type":"delete_cron_trigger",
"args":{
"name":"test_cron_trigger"
}
}
st,resp = hge_ctx.v1q(q)
assert st == 200,resp