implement query to update an event trigger (#367)

This commit is contained in:
Tirumarai Selvan 2018-09-19 17:42:57 +05:30 committed by Shahidh K Muhammed
parent 9d0ec2f8ba
commit c42af444f7
23 changed files with 486 additions and 123 deletions

View File

@ -223,10 +223,10 @@ applyQP2 (ReplaceMetadata tables templates) = do
withPathK "delete_permissions" $ processPerms tabInfo $
table ^. tmDeletePermissions
indexedForM_ tables $ \table -> do
indexedForM_ tables $ \table ->
withPathK "event_triggers" $
indexedForM_ (table ^. tmEventTriggers) $ \et ->
DS.subTableP2 (table ^. tmTable) et
DS.subTableP2 (table ^. tmTable) False et
-- query templates
withPathK "queryTemplates" $
@ -408,8 +408,7 @@ instance HDBQuery DumpInternalState where
type Phase1Res DumpInternalState = ()
phaseOne _ = adminOnly
phaseTwo _ _ = do
sc <- askSchemaCache
return $ encode sc
phaseTwo _ _ =
encode <$> askSchemaCache
schemaCachePolicy = SCPNoChange

View File

@ -106,7 +106,7 @@ mkTriggerQ trid trn (QualifiedTable sn tn) (TriggerOpsDef insert update delete)
addEventTriggerToCatalog :: QualifiedTable -> EventTriggerDef
-> Q.TxE QErr TriggerId
addEventTriggerToCatalog (QualifiedTable sn tn) (EventTriggerDef name def webhook rconf) = do
addEventTriggerToCatalog qt@(QualifiedTable sn tn) (EventTriggerDef name def webhook rconf) = do
ids <- map runIdentity <$> Q.listQE defaultTxErrorHandler [Q.sql|
INSERT into hdb_catalog.event_triggers (name, type, schema_name, table_name, definition, webhook, num_retries, retry_interval)
VALUES ($1, 'table', $2, $3, $4, $5, $6, $7)
@ -114,14 +114,11 @@ addEventTriggerToCatalog (QualifiedTable sn tn) (EventTriggerDef name def webhoo
|] (name, sn, tn, Q.AltJ $ toJSON def, webhook, toInt64 $ rcNumRetries rconf, toInt64 $ rcIntervalSec rconf) True
trid <- getTrid ids
mkTriggerQ trid name (QualifiedTable sn tn) def
mkTriggerQ trid name qt def
return trid
where
getTrid [] = throw500 "could not create event-trigger"
getTrid (x:_) = return x
toInt64 :: (Integral a) => a -> Int64
toInt64 = fromIntegral
delEventTriggerFromCatalog :: TriggerName -> Q.TxE QErr ()
delEventTriggerFromCatalog trn = do
@ -135,6 +132,29 @@ delEventTriggerFromCatalog trn = do
tx :: Ops -> Q.TxE QErr ()
tx op = Q.multiQE defaultTxErrorHandler (Q.fromBuilder $ TE.encodeUtf8Builder $ getDropFuncSql op trn)
updateEventTriggerToCatalog
:: QualifiedTable
-> EventTriggerDef
-> Q.TxE QErr TriggerId
updateEventTriggerToCatalog qt (EventTriggerDef name def webhook rconf) = do
ids <- map runIdentity <$> Q.listQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.event_triggers
SET
definition = $1,
webhook = $2,
num_retries = $3,
retry_interval = $4
WHERE name = $5
RETURNING id
|] (Q.AltJ $ toJSON def, webhook, toInt64 $ rcNumRetries rconf, toInt64 $ rcIntervalSec rconf, name) True
trid <- getTrid ids
mkTriggerQ trid name qt def
return trid
where
getTrid [] = throw500 "could not update event-trigger"
getTrid (x:_) = return x
fetchEventTrigger :: TriggerName -> Q.TxE QErr EventTrigger
fetchEventTrigger trn = do
triggers <- Q.listQE defaultTxErrorHandler [Q.sql|
@ -178,15 +198,21 @@ markForDelivery eid =
WHERE id = $1
|] (Identity eid) True
subTableP1 :: (P1C m) => CreateEventTriggerQuery -> m (QualifiedTable, EventTriggerDef)
subTableP1 (CreateEventTriggerQuery name qt insert update delete retryConf webhook) = do
subTableP1 :: (P1C m) => CreateEventTriggerQuery -> m (QualifiedTable, Bool, EventTriggerDef)
subTableP1 (CreateEventTriggerQuery name qt insert update delete retryConf webhook replace) = do
adminOnly
ti <- askTabInfo qt
-- can only replace for same table
when replace $ do
ti' <- askTabInfoFromTrigger name
when (tiName ti' /= tiName ti) $ throw400 NotSupported "cannot replace table or schema for trigger"
assertCols ti insert
assertCols ti update
assertCols ti delete
let rconf = fromMaybe (RetryConf defaultNumRetries defaultRetryInterval) retryConf
return (qt, EventTriggerDef name (TriggerOpsDef insert update delete) webhook rconf)
return (qt, replace, EventTriggerDef name (TriggerOpsDef insert update delete) webhook rconf)
where
assertCols _ Nothing = return ()
assertCols ti (Just sos) = do
@ -195,36 +221,43 @@ subTableP1 (CreateEventTriggerQuery name qt insert update delete retryConf webho
SubCStar -> return ()
SubCArray pgcols -> forM_ pgcols (assertPGCol (tiFieldInfoMap ti) "")
subTableP2 :: (P2C m) => QualifiedTable -> EventTriggerDef -> m ()
subTableP2 qt q@(EventTriggerDef name def webhook rconf) = do
trid <- liftTx $ addEventTriggerToCatalog qt q
subTableP2 :: (P2C m) => QualifiedTable -> Bool -> EventTriggerDef -> m ()
subTableP2 qt replace q@(EventTriggerDef name def webhook rconf) = do
trid <- if replace
then do
delEventTriggerFromCache qt name
liftTx $ updateEventTriggerToCatalog qt q
else
liftTx $ addEventTriggerToCatalog qt q
addEventTriggerToCache qt trid name def rconf webhook
subTableP2shim :: (P2C m) => (QualifiedTable, EventTriggerDef) -> m RespBody
subTableP2shim (qt, etdef) = do
subTableP2 qt etdef
subTableP2shim :: (P2C m) => (QualifiedTable, Bool, EventTriggerDef) -> m RespBody
subTableP2shim (qt, replace, etdef) = do
subTableP2 qt replace etdef
return successMsg
instance HDBQuery CreateEventTriggerQuery where
type Phase1Res CreateEventTriggerQuery = (QualifiedTable, EventTriggerDef)
type Phase1Res CreateEventTriggerQuery = (QualifiedTable, Bool, EventTriggerDef)
phaseOne = subTableP1
phaseTwo _ = subTableP2shim
schemaCachePolicy = SCPReload
unsubTableP1 :: (P1C m) => DeleteEventTriggerQuery -> m ()
unsubTableP1 _ = adminOnly
unsubTableP1 :: (P1C m) => DeleteEventTriggerQuery -> m QualifiedTable
unsubTableP1 (DeleteEventTriggerQuery name) = do
adminOnly
ti <- askTabInfoFromTrigger name
return $ tiName ti
unsubTableP2 :: (P2C m) => DeleteEventTriggerQuery -> m RespBody
unsubTableP2 (DeleteEventTriggerQuery name) = do
et <- liftTx $ fetchEventTrigger name
delEventTriggerFromCache (etTable et) name
unsubTableP2 :: (P2C m) => QualifiedTable -> DeleteEventTriggerQuery -> m RespBody
unsubTableP2 qt (DeleteEventTriggerQuery name) = do
delEventTriggerFromCache qt name
liftTx $ delEventTriggerFromCatalog name
return successMsg
instance HDBQuery DeleteEventTriggerQuery where
type Phase1Res DeleteEventTriggerQuery = ()
type Phase1Res DeleteEventTriggerQuery = QualifiedTable
phaseOne = unsubTableP1
phaseTwo q _ = unsubTableP2 q
phaseTwo q qt = unsubTableP2 qt q
schemaCachePolicy = SCPReload
deliverEvent :: (P2C m) => DeliverEventQuery -> m RespBody
@ -238,3 +271,6 @@ instance HDBQuery DeliverEventQuery where
phaseOne _ = adminOnly
phaseTwo q _ = deliverEvent q
schemaCachePolicy = SCPNoChange
toInt64 :: (Integral a) => a -> Int64
toInt64 = fromIntegral

View File

@ -34,6 +34,8 @@ module Hasura.RQL.Types
, askFieldInfo
, askPGColInfo
, askCurRole
, askEventTriggerInfo
, askTabInfoFromTrigger
, askQTemplateInfo
@ -144,6 +146,23 @@ askTabInfo tabName = do
where
errMsg = "table " <> tabName <<> " does not exist"
askTabInfoFromTrigger
:: (QErrM m, CacheRM m)
=> TriggerName -> m TableInfo
askTabInfoFromTrigger trn = do
sc <- askSchemaCache
let tabInfos = M.elems $ scTables sc
liftMaybe (err400 NotExists errMsg) $ find (isJust.M.lookup trn.tiEventTriggerInfoMap) tabInfos
where
errMsg = "event trigger " <> trn <<> " does not exist"
askEventTriggerInfo
:: (QErrM m, CacheRM m)
=> EventTriggerInfoMap -> TriggerName -> m EventTriggerInfo
askEventTriggerInfo etim trn = liftMaybe (err400 NotExists errMsg) $ M.lookup trn etim
where
errMsg = "event trigger " <> trn <<> " does not exist"
askQTemplateInfo
:: (P1C m)
=> TQueryName

View File

@ -66,6 +66,7 @@ module Hasura.RQL.Types.SchemaCache
, delEventTriggerFromCache
, getOpInfo
, EventTriggerInfo(..)
, EventTriggerInfoMap
, OpTriggerInfo(..)
, TableObjId(..)

View File

@ -20,7 +20,6 @@ module Hasura.RQL.Types.Subscribe
import Data.Aeson
import Data.Aeson.Casing
import Data.Aeson.TH
import Data.Int (Int64)
import Hasura.Prelude
import Hasura.SQL.Types
import Language.Haskell.TH.Syntax (Lift)
@ -69,6 +68,7 @@ data CreateEventTriggerQuery
, cetqDelete :: !(Maybe SubscribeOpSpec)
, cetqRetryConf :: !(Maybe RetryConf)
, cetqWebhook :: !T.Text
, cetqReplace :: !Bool
} deriving (Show, Eq, Lift)
instance FromJSON CreateEventTriggerQuery where
@ -79,7 +79,8 @@ instance FromJSON CreateEventTriggerQuery where
update <- o .:? "update"
delete <- o .:? "delete"
retryConf <- o .:? "retry_conf"
webhook <- o .: "webhook"
webhook <- o .: "webhook"
replace <- o .:? "replace" .!= False
let regex = mkRegex "^\\w+$"
mName = matchRegex regex (T.unpack name)
case mName of
@ -88,7 +89,7 @@ instance FromJSON CreateEventTriggerQuery where
case insert <|> update <|> delete of
Just _ -> return ()
Nothing -> fail "must provide operation spec(s)"
return $ CreateEventTriggerQuery name table insert update delete retryConf webhook
return $ CreateEventTriggerQuery name table insert update delete retryConf webhook replace
parseJSON _ = fail "expecting an object"
$(deriveToJSON (aesonDrop 4 snakeCase){omitNothingFields=True} ''CreateEventTriggerQuery)

View File

@ -29,14 +29,23 @@ class WebhookHandler(http.server.BaseHTTPRequestHandler):
contentLen = self.headers.get('Content-Length')
reqBody = self.rfile.read(int(contentLen)).decode("utf-8")
reqJson = json.loads(reqBody)
reqHeaders = self.headers
reqPath = self.path
self.log_message(json.dumps(reqJson))
self.send_response(HTTPStatus.NO_CONTENT)
self.end_headers()
self.server.resp_queue.put(reqJson)
if reqPath == "/fail":
self.send_response(HTTPStatus.INTERNAL_SERVER_ERROR)
self.end_headers()
self.server.error_queue.put({"path": reqPath, "body": reqJson, "headers": reqHeaders})
else:
self.send_response(HTTPStatus.NO_CONTENT)
self.end_headers()
self.server.resp_queue.put({"path": reqPath, "body": reqJson, "headers": reqHeaders})
class WebhookServer(http.server.HTTPServer):
def __init__(self, resp_queue, server_address):
def __init__(self, resp_queue, error_queue, server_address):
self.resp_queue = resp_queue
self.error_queue = error_queue
super().__init__(server_address, WebhookHandler)
class HGECtx:
@ -44,8 +53,9 @@ class HGECtx:
server_address = ('0.0.0.0', 5592)
self.resp_queue = queue.Queue(maxsize=1)
self.error_queue = queue.Queue()
self.ws_queue = queue.Queue(maxsize=1)
self.httpd = WebhookServer(self.resp_queue, server_address)
self.httpd = WebhookServer(self.resp_queue, self.error_queue, server_address)
self.web_server = threading.Thread(target=self.httpd.serve_forever)
self.web_server.start()
@ -81,6 +91,13 @@ class HGECtx:
def get_event(self, timeout):
return self.resp_queue.get(timeout=timeout)
def get_error_queue_size(self):
sz = 0
while not self.error_queue.empty():
self.error_queue.get()
sz = sz + 1
return sz
def get_ws_event(self, timeout):
return json.loads(self.ws_queue.get(timeout=timeout))

View File

@ -0,0 +1,26 @@
type: bulk
args:
- type: run_sql
args:
sql: |
create table hge_tests.test_t1(
c1 int,
c2 text
);
- type: track_table
args:
schema: hge_tests
name: test_t1
- type: create_event_trigger
args:
name: t1_all
table:
schema: hge_tests
name: test_t1
insert:
columns: "*"
update:
columns: "*"
delete:
columns: "*"
webhook: http://127.0.0.1:5592

View File

@ -0,0 +1,5 @@
type: bulk
args:
- type: delete_event_trigger
args:
name: t1_all

View File

@ -0,0 +1,6 @@
type: bulk
args:
- type: run_sql
args:
sql: |
drop table hge_tests.test_t1

View File

@ -14,7 +14,7 @@ args:
name: test_t1
- type: create_event_trigger
args:
name: t1_all
name: t1_empty
table:
schema: hge_tests
name: test_t1

View File

@ -0,0 +1,9 @@
type: bulk
args:
- type: delete_event_trigger
args:
name: t1_empty
- type: run_sql
args:
sql: |
drop table hge_tests.test_t1

View File

@ -13,7 +13,7 @@ args:
name: test_t1
- type: create_event_trigger
args:
name: t1_all
name: t1_insert
table:
schema: hge_tests
name: test_t1

View File

@ -0,0 +1,9 @@
type: bulk
args:
- type: delete_event_trigger
args:
name: t1_insert
- type: run_sql
args:
sql: |
drop table hge_tests.test_t1

View File

@ -0,0 +1,29 @@
type: bulk
args:
- type: run_sql
args:
sql: |
create table hge_tests.test_t1(
c1 int,
c2 text
);
- type: track_table
args:
schema: hge_tests
name: test_t1
- type: create_event_trigger
args:
name: t1_retry
table:
schema: hge_tests
name: test_t1
insert:
columns: "*"
update:
columns: "*"
delete:
columns: "*"
webhook: http://127.0.0.1:5592/fail
retry_conf:
num_retries: 4
interval_sec: 2

View File

@ -0,0 +1,9 @@
type: bulk
args:
- type: delete_event_trigger
args:
name: t1_retry
- type: run_sql
args:
sql: |
drop table hge_tests.test_t1

View File

@ -14,7 +14,7 @@ args:
name: test_t1
- type: create_event_trigger
args:
name: t1_all
name: t1_cols
table:
schema: hge_tests
name: test_t1

View File

@ -2,7 +2,7 @@ type: bulk
args:
- type: delete_event_trigger
args:
name: t1_all
name: t1_cols
- type: run_sql
args:
sql: |

View File

@ -0,0 +1,27 @@
type: bulk
args:
- type: run_sql
args:
sql: |
create table hge_tests.test_t1(
c1 int,
c2 text
);
- type: track_table
args:
schema: hge_tests
name: test_t1
- type: create_event_trigger
args:
name: t1_cols
table:
schema: hge_tests
name: test_t1
insert:
columns: ["c2"]
update:
columns: ["c1"]
webhook: http://127.0.0.1:5592
retry_conf:
num_retries: 5
interval_sec: 5

View File

@ -2,7 +2,7 @@ type: bulk
args:
- type: delete_event_trigger
args:
name: t1_all
name: t1_cols
- type: run_sql
args:
sql: |

View File

@ -0,0 +1,19 @@
type: bulk
args:
- type: create_event_trigger
args:
name: t1_cols
table:
schema: hge_tests
name: test_t1
insert:
columns: ["c1"]
update:
columns: ["c2"]
delete:
columns: "*"
webhook: http://127.0.0.1:5592/new
retry_conf:
num_retries: 5
interval_sec: 5
replace: true

View File

@ -3,22 +3,71 @@
import pytest
import queue
import yaml
from validate import check_delete, check_update, check_insert
import time
from validate import check_event
class TestEvtBasic(object):
def select_last_event_fromdb(hge_ctx):
q = {
"type": "select",
"args": {
"table": {"schema": "hdb_catalog", "name": "event_log"},
"columns": ["*"],
"order_by": ["-created_at"],
"limit": 1
}
}
st_code, resp = hge_ctx.v1q(q)
return st_code, resp
def insert(hge_ctx, table, row, returning = []):
q = {
"type": "insert",
"args": {
"table": table,
"objects": [row],
"returning": returning
}
}
st_code, resp = hge_ctx.v1q(q)
return st_code, resp
def update(hge_ctx, table, where_exp, set_exp):
q = {
"type": "update",
"args": {
"table": table,
"where": where_exp,
"$set": set_exp
}
}
st_code, resp = hge_ctx.v1q(q)
return st_code, resp
def delete(hge_ctx, table, where_exp):
q = {
"type": "delete",
"args": {
"table": table,
"where": where_exp
}
}
st_code, resp = hge_ctx.v1q(q)
return st_code, resp
class TestCreateEvtQuery(object):
@pytest.fixture(autouse=True)
def transact(self, request, hge_ctx):
print ("In setup method")
st_code, resp = hge_ctx.v1q_f('queries/basic/setup.yaml')
st_code, resp = hge_ctx.v1q_f('queries/event_triggers/basic/setup.yaml')
assert st_code == 200, resp
yield
st_code, resp = hge_ctx.v1q_f('queries/basic/teardown.yaml')
st_code, resp = hge_ctx.v1q_f('queries/event_triggers/basic/teardown.yaml')
assert st_code == 200, resp
def test_basic(self,hge_ctx):
def test_basic(self, hge_ctx):
table = {"schema" : "hge_tests", "name": "test_t1"}
init_row = {"c1" : 1, "c2" : "hello"}
@ -26,7 +75,10 @@ class TestEvtBasic(object):
"old": None,
"new": init_row
}
check_insert(hge_ctx, "t1_all", table, init_row, exp_ev_data)
headers = {}
st_code, resp = insert(hge_ctx, table, init_row)
assert st_code == 200, resp
check_event(hge_ctx, "t1_all", table, "INSERT", exp_ev_data, headers, "/")
where_exp = {"c1": 1}
set_exp = {"c2" : "world"}
@ -34,14 +86,17 @@ class TestEvtBasic(object):
"old": init_row,
"new": {"c1" : 1, "c2" : "world"}
}
check_update(hge_ctx, "t1_all", table, init_row, where_exp, set_exp, exp_ev_data)
st_code, resp = update(hge_ctx, table, where_exp, set_exp)
assert st_code == 200, resp
check_event(hge_ctx, "t1_all", table, "UPDATE", exp_ev_data, headers, "/")
exp_ev_data = {
"old": {"c1" : 1, "c2" : "world"},
"new": None
}
check_delete(hge_ctx, "t1_all", table, where_exp, exp_ev_data)
st_code, resp = delete(hge_ctx, table, where_exp)
assert st_code == 200, resp
check_event(hge_ctx, "t1_all", table, "DELETE", exp_ev_data, headers, "/")
def test_basic_dep(self,hge_ctx):
@ -54,18 +109,134 @@ class TestEvtBasic(object):
assert st_code == 400, resp
assert resp['code'] == "dependency-error", resp
class TestRetryConf(object):
@pytest.fixture(autouse=True)
def transact(self, request, hge_ctx):
print ("In setup method")
st_code, resp = hge_ctx.v1q_f('queries/event_triggers/retry_conf/setup.yaml')
assert st_code == 200, resp
yield
st_code, resp = hge_ctx.v1q_f('queries/event_triggers/retry_conf/teardown.yaml')
assert st_code == 200, resp
def test_basic(self, hge_ctx):
table = {"schema" : "hge_tests", "name": "test_t1"}
init_row = {"c1" : 1, "c2" : "hello"}
exp_ev_data = {
"old": None,
"new": init_row
}
headers = {}
st_code, resp = insert(hge_ctx, table, init_row)
assert st_code == 200, resp
time.sleep(15)
tries = hge_ctx.get_error_queue_size()
assert tries == 5, tries
class TestUpdateEvtQuery(object):
@pytest.fixture(autouse=True)
def transact(self, request, hge_ctx):
print ("In setup method")
st_code, resp = hge_ctx.v1q_f('queries/event_triggers/update_query/create-setup.yaml')
assert st_code == 200, resp
st_code, resp = hge_ctx.v1q_f('queries/event_triggers/update_query/update-setup.yaml')
assert st_code == 200, '{}'.format(resp)
yield
st_code, resp = hge_ctx.v1q_f('queries/event_triggers/update_query/teardown.yaml')
assert st_code == 200, resp
def test_update_basic(self, hge_ctx):
table = {"schema" : "hge_tests", "name": "test_t1"}
init_row = {"c1" : 1, "c2" : "hello"}
exp_ev_data = {
"old": None,
"new": {"c1": 1}
}
headers = {}
st_code, resp = insert(hge_ctx, table, init_row)
assert st_code == 200, resp
check_event(hge_ctx, "t1_cols", table, "INSERT", exp_ev_data, headers, "/new")
where_exp = {"c1": 1}
set_exp = {"c2" : "world"}
exp_ev_data = {
"old": {"c2" : "hello"},
"new": {"c2" : "world"}
}
st_code, resp = update(hge_ctx, table, where_exp, set_exp)
assert st_code == 200, resp
check_event(hge_ctx, "t1_cols", table, "UPDATE", exp_ev_data, headers, "/new")
exp_ev_data = {
"old": {"c1" : 1, "c2" : "world"},
"new": None
}
st_code, resp = delete(hge_ctx, table, where_exp)
assert st_code == 200, resp
check_event(hge_ctx, "t1_cols", table, "DELETE", exp_ev_data, headers, "/new")
class TestDeleteEvtQuery(object):
@pytest.fixture(autouse=True)
def transact(self, request, hge_ctx):
print ("In setup method")
st_code, resp = hge_ctx.v1q_f('queries/event_triggers/basic/setup.yaml')
assert st_code == 200, resp
st_code, resp = hge_ctx.v1q_f('queries/event_triggers/delete_query/setup.yaml')
assert st_code == 200, '{}'.format(resp)
yield
st_code, resp = hge_ctx.v1q_f('queries/event_triggers/delete_query/teardown.yaml')
assert st_code == 200, resp
def test_delete_basic(self, hge_ctx):
table = {"schema" : "hge_tests", "name": "test_t1"}
init_row = {"c1" : 1, "c2" : "hello"}
exp_ev_data = {
"old": None,
"new": init_row
}
headers = {}
st_code, resp = insert(hge_ctx, table, init_row)
assert st_code == 200, resp
with pytest.raises(queue.Empty):
check_event(hge_ctx, "t1_all", table, "INSERT", exp_ev_data, headers, "/")
where_exp = {"c1": 1}
set_exp = {"c2" : "world"}
exp_ev_data = {
"old": init_row,
"new": {"c1" : 1, "c2" : "world"}
}
st_code, resp = update(hge_ctx, table, where_exp, set_exp)
assert st_code == 200, resp
with pytest.raises(queue.Empty):
check_event(hge_ctx, "t1_all", table, "UPDATE", exp_ev_data, headers, "/")
exp_ev_data = {
"old": {"c1" : 1, "c2" : "world"},
"new": None
}
st_code, resp = delete(hge_ctx, table, where_exp)
assert st_code == 200, resp
with pytest.raises(queue.Empty):
check_event(hge_ctx, "t1_all", table, "DELETE", exp_ev_data, headers, "/")
@pytest.mark.usefixtures('hge_ctx')
class TestEvtSelCols:
@pytest.fixture(autouse=True)
def transact(self, request, hge_ctx):
print ("In setup method")
st_code, resp = hge_ctx.v1q_f('queries/selected_cols/setup.yaml')
st_code, resp = hge_ctx.v1q_f('queries/event_triggers/selected_cols/setup.yaml')
assert st_code == 200, resp
yield
st_code, resp = hge_ctx.v1q_f('queries/selected_cols/teardown.yaml')
st_code, resp = hge_ctx.v1q_f('queries/event_triggers/selected_cols/teardown.yaml')
assert st_code == 200, resp
def test_selected_cols(self, hge_ctx):
@ -77,7 +248,10 @@ class TestEvtSelCols:
"old": None,
"new": {"c2": "hello"}
}
check_insert(hge_ctx, "t1_all", table, init_row, exp_ev_data)
headers = {}
st_code, resp = insert(hge_ctx, table, init_row)
assert st_code == 200, resp
check_event(hge_ctx, "t1_cols", table, "INSERT", exp_ev_data, headers, "/")
where_exp = {"c1": 1}
set_exp = {"c2" : "world"}
@ -85,13 +259,17 @@ class TestEvtSelCols:
"old": {"c1" : 1},
"new": {"c1" : 1}
}
check_update(hge_ctx, "t1_all", table, init_row, where_exp, set_exp, exp_ev_data)
st_code, resp = update(hge_ctx, table, where_exp, set_exp)
assert st_code == 200, resp
check_event(hge_ctx, "t1_cols", table, "UPDATE", exp_ev_data, headers, "/")
exp_ev_data = {
"old": {"c1" : 1, "c2" : "world"},
"new": None
}
check_delete(hge_ctx, "t1_all", table, where_exp, exp_ev_data)
st_code, resp = delete(hge_ctx, table, where_exp)
assert st_code == 200, resp
check_event(hge_ctx, "t1_cols", table, "DELETE", exp_ev_data, headers, "/")
def test_selected_cols_dep(self, hge_ctx):
@ -122,19 +300,15 @@ class TestEvtSelCols:
})
assert st_code == 200, resp
class TestEvtEmptyCols:
@pytest.fixture(autouse=True)
def transact(self, request, hge_ctx):
print ("In setup method")
st_code, resp = hge_ctx.v1q_f('queries/empty_cols/setup.yaml')
st_code, resp = hge_ctx.v1q_f('queries/event_triggers/empty_cols/setup.yaml')
assert st_code == 200, resp
yield
st_code, resp = hge_ctx.v1q_f('queries/empty_cols/teardown.yaml')
st_code, resp = hge_ctx.v1q_f('queries/event_triggers/empty_cols/teardown.yaml')
assert st_code == 200, resp
@ -147,7 +321,10 @@ class TestEvtEmptyCols:
"old": None,
"new": {}
}
check_insert(hge_ctx, "t1_all", table, init_row, exp_ev_data)
headers = {}
st_code, resp = insert(hge_ctx, table, init_row)
assert st_code == 200, resp
check_event(hge_ctx, "t1_empty", table, "INSERT", exp_ev_data, headers, "/")
where_exp = {"c1": 1}
set_exp = {"c2" : "world"}
@ -155,13 +332,17 @@ class TestEvtEmptyCols:
"old": {},
"new": {}
}
check_update(hge_ctx, "t1_all", table, init_row, where_exp, set_exp, exp_ev_data)
st_code, resp = update(hge_ctx, table, where_exp, set_exp)
assert st_code == 200, resp
check_event(hge_ctx, "t1_empty", table, "UPDATE", exp_ev_data, headers, "/")
exp_ev_data = {
"old": {},
"new": None
}
check_delete(hge_ctx, "t1_all", table, where_exp, exp_ev_data)
st_code, resp = delete(hge_ctx, table, where_exp)
assert st_code == 200, resp
check_event(hge_ctx, "t1_empty", table, "DELETE", exp_ev_data, headers, "/")
class TestEvtInsertOnly:
@ -169,10 +350,10 @@ class TestEvtInsertOnly:
@pytest.fixture(autouse=True)
def transact(self, request, hge_ctx):
print ("In setup method")
st_code, resp = hge_ctx.v1q_f('queries/insert_only/setup.yaml')
st_code, resp = hge_ctx.v1q_f('queries/event_triggers/insert_only/setup.yaml')
assert st_code == 200, resp
yield
st_code, resp = hge_ctx.v1q_f('queries/insert_only/teardown.yaml')
st_code, resp = hge_ctx.v1q_f('queries/event_triggers/insert_only/teardown.yaml')
assert st_code == 200, resp
@ -185,7 +366,10 @@ class TestEvtInsertOnly:
"old": None,
"new": init_row
}
check_insert(hge_ctx, "t1_all", table, init_row, exp_ev_data)
headers = {}
st_code, resp = insert(hge_ctx, table, init_row)
assert st_code == 200, resp
check_event(hge_ctx, "t1_insert", table, "INSERT", exp_ev_data, headers, "/")
where_exp = {"c1": 1}
set_exp = {"c2" : "world"}
@ -193,13 +377,17 @@ class TestEvtInsertOnly:
"old": init_row,
"new": {"c1" : 1, "c2" : "world"}
}
st_code, resp = update(hge_ctx, table, where_exp, set_exp)
assert st_code == 200, resp
with pytest.raises(queue.Empty):
check_update(hge_ctx, "t1_all", table, init_row, where_exp, set_exp, exp_ev_data)
check_event(hge_ctx, "t1_insert", table, "UPDATE", exp_ev_data, headers, "/")
exp_ev_data = {
"old": {"c1" : 1, "c2" : "world"},
"new": None
}
st_code, resp = delete(hge_ctx, table, where_exp)
assert st_code == 200, resp
with pytest.raises(queue.Empty):
check_delete(hge_ctx, "t1_all", table, where_exp, exp_ev_data)
check_event(hge_ctx, "t1_insert", table, "DELETE", exp_ev_data, headers, "/")

View File

@ -17,65 +17,28 @@ def check_ev_payload_shape(ev_payload):
trigger_keys = ["id", "name"]
check_keys(trigger_keys, ev_payload['trigger'])
def get_event_of_query(hge_ctx, q):
st_code, resp = hge_ctx.v1q(q)
assert st_code == 200, resp
return hge_ctx.get_event(3)
def validate_event_payload(ev_payload, trig_name, table):
check_ev_payload_shape(ev_payload)
assert ev_payload['table'] == table, ev_payload
assert ev_payload['trigger']['name'] == trig_name, ev_payload
return ev_payload['event']
def validate_event_headers(ev_headers, headers):
for key, value in headers.items():
v = ev_headers.get(key)
assert v == value
def check_insert(hge_ctx, trig_name, table, row, exp_ev_data):
query = {
"type": "insert",
"args": {
"table": table,
"objects": [row]
}
}
def validate_event_webhook(ev_webhook_path, webhook_path):
assert ev_webhook_path == webhook_path
ev_payload = get_event_of_query(hge_ctx,query)
ev = validate_event_payload(ev_payload, trig_name, table)
def check_event(hge_ctx, trig_name, table, operation, exp_ev_data, headers, webhook_path):
# insert specific assertions
assert ev['op'] == "INSERT", ev_payload
assert ev['data'] == exp_ev_data, ev_payload
def check_update(hge_ctx, trig_name, table, old_row, where, set_exp, exp_ev_data):
query = {
"type": "update",
"args": {
"table": table,
"where": where,
"$set": set_exp
}
}
ev_payload = get_event_of_query(hge_ctx,query)
ev = validate_event_payload(ev_payload, trig_name, table)
# update specific assertions
assert ev['op'] == "UPDATE", ev_payload
assert ev['data'] == exp_ev_data, ev_payload
def check_delete(hge_ctx, trig_name, table, where_exp, exp_ev_data):
query = {
"type": "delete",
"args": {
"table": table,
"where": where_exp
}
}
ev_payload = get_event_of_query(hge_ctx,query)
ev = validate_event_payload(ev_payload, trig_name, table)
assert ev['op'] == "DELETE", ev_payload
assert ev['data'] == exp_ev_data, ev_payload
ev_full = hge_ctx.get_event(3)
validate_event_webhook(ev_full['path'], webhook_path)
validate_event_headers(ev_full['headers'], headers)
validate_event_payload(ev_full['body'], trig_name, table)
ev = ev_full['body']['event']
assert ev['op'] == operation, ev
assert ev['data'] == exp_ev_data, ev
def check_query(hge_ctx, conf):
headers={}