graphql-engine/server/src-lib/Hasura/RQL/DDL/EventTrigger.hs
Vishnu Bharathi P 58c44f55dd Merge oss/master onto mono/main
GitOrigin-RevId: 1c8c4d60e033c8a0bc8b2beed24c5bceb7d4bcc8
2020-11-12 22:37:19 +05:30

396 lines
14 KiB
Haskell
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

module Hasura.RQL.DDL.EventTrigger
( CreateEventTriggerQuery
, runCreateEventTriggerQuery
, DeleteEventTriggerQuery
, runDeleteEventTriggerQuery
, RedeliverEventQuery
, runRedeliverEvent
, runInvokeEventTrigger
-- TODO(from master): review
, delEventTriggerFromCatalog
, mkEventTriggerInfo
, mkAllTriggersQ
, delTriggerQ
, getEventTriggerDef
, getWebhookInfoFromConf
, getHeaderInfosFromConf
, updateEventTriggerInCatalog
, replaceEventTriggersInCatalog
) where
import Hasura.Prelude
import qualified Data.Environment as Env
import qualified Data.HashMap.Strict.Extended as Map
import qualified Data.Text as T
import qualified Data.Text.Lazy as TL
import qualified Database.PG.Query as Q
import qualified Text.Shakespeare.Text as ST
import Data.Aeson
import qualified Hasura.Backends.Postgres.SQL.DML as S
import Hasura.Backends.Postgres.SQL.Types
import Hasura.EncJSON
import Hasura.RQL.DDL.Headers
import Hasura.RQL.DML.Internal
import Hasura.RQL.Types
import Hasura.SQL.Types
data OpVar = OLD | NEW deriving (Show)
-- pgIdenTrigger is a method used to construct the name of the pg function
-- used for event triggers which are present in the hdb_catalog schema.
pgIdenTrigger:: Ops -> TriggerName -> Text
pgIdenTrigger op trn = pgFmtIdentifier . qualifyTriggerName op $ triggerNameToTxt trn
where
qualifyTriggerName op' trn' = "notify_hasura_" <> trn' <> "_" <> T.pack (show op')
mkAllTriggersQ
:: (MonadTx m, HasSQLGenCtx m)
=> TriggerName
-> QualifiedTable
-> [ColumnInfo 'Postgres]
-> TriggerOpsDef
-> m ()
mkAllTriggersQ trn qt allCols fullspec = do
onJust (tdInsert fullspec) (mkTriggerQ trn qt allCols INSERT)
onJust (tdUpdate fullspec) (mkTriggerQ trn qt allCols UPDATE)
onJust (tdDelete fullspec) (mkTriggerQ trn qt allCols DELETE)
mkTriggerQ
:: (MonadTx m, HasSQLGenCtx m)
=> TriggerName
-> QualifiedTable
-> [ColumnInfo 'Postgres]
-> Ops
-> SubscribeOpSpec
-> m ()
mkTriggerQ trn qt allCols op (SubscribeOpSpec columns payload) = do
strfyNum <- stringifyNum <$> askSQLGenCtx
liftTx $ Q.multiQE defaultTxErrorHandler $ Q.fromText . TL.toStrict $
let payloadColumns = fromMaybe SubCStar payload
mkQId opVar colInfo = toJSONableExp strfyNum (pgiType colInfo) False $
S.SEQIdentifier $ S.QIdentifier (opToQual opVar) $ toIdentifier $ pgiColumn colInfo
getRowExpression opVar = case payloadColumns of
SubCStar -> applyRowToJson $ S.SEUnsafe $ opToTxt opVar
SubCArray cols -> applyRowToJson $
S.mkRowExp $ map (toExtr . mkQId opVar) $
getColInfos cols allCols
renderRow opVar = case columns of
SubCStar -> applyRow $ S.SEUnsafe $ opToTxt opVar
SubCArray cols -> applyRow $
S.mkRowExp $ map (toExtr . mkQId opVar) $
getColInfos cols allCols
oldDataExp = case op of
INSERT -> S.SENull
UPDATE -> getRowExpression OLD
DELETE -> getRowExpression OLD
MANUAL -> S.SENull
newDataExp = case op of
INSERT -> getRowExpression NEW
UPDATE -> getRowExpression NEW
DELETE -> S.SENull
MANUAL -> S.SENull
name = triggerNameToTxt trn
qualifiedTriggerName = pgIdenTrigger op trn
qualifiedTable = toSQLTxt qt
operation = T.pack $ show op
oldRow = toSQLTxt $ renderRow OLD
newRow = toSQLTxt $ renderRow NEW
oldPayloadExpression = toSQLTxt oldDataExp
newPayloadExpression = toSQLTxt newDataExp
in $(ST.stextFile "src-rsr/trigger.sql.shakespeare")
where
applyRowToJson e = S.SEFnApp "row_to_json" [e] Nothing
applyRow e = S.SEFnApp "row" [e] Nothing
toExtr = flip S.Extractor Nothing
opToQual = S.QualVar . opToTxt
opToTxt = T.pack . show
delTriggerQ :: TriggerName -> Q.TxE QErr ()
delTriggerQ trn =
mapM_ (\op -> Q.unitQE
defaultTxErrorHandler
(Q.fromText $ getDropFuncSql op) () False) [INSERT, UPDATE, DELETE]
where
getDropFuncSql :: Ops -> T.Text
getDropFuncSql op =
"DROP FUNCTION IF EXISTS"
<> " hdb_catalog." <> pgIdenTrigger op trn <> "()"
<> " CASCADE"
addEventTriggerToCatalog
:: QualifiedTable
-> EventTriggerConf
-> Q.TxE QErr ()
addEventTriggerToCatalog qt etc = do
Q.unitQE defaultTxErrorHandler
[Q.sql|
INSERT into hdb_catalog.event_triggers
(name, type, schema_name, table_name, configuration)
VALUES ($1, 'table', $2, $3, $4)
|] (name, sn, tn, Q.AltJ $ toJSON etc) False
where
QualifiedObject sn tn = qt
(EventTriggerConf name _ _ _ _ _) = etc
delEventTriggerFromCatalog :: TriggerName -> Q.TxE QErr ()
delEventTriggerFromCatalog trn = do
Q.unitQE defaultTxErrorHandler [Q.sql|
DELETE FROM
hdb_catalog.event_triggers
WHERE name = $1
|] (Identity trn) False
delTriggerQ trn
archiveEvents trn
archiveEvents :: TriggerName -> Q.TxE QErr ()
archiveEvents trn = do
Q.unitQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.event_log
SET archived = 't'
WHERE trigger_name = $1
|] (Identity trn) False
fetchEvent :: EventId -> Q.TxE QErr (EventId, Bool)
fetchEvent eid = do
events <- Q.listQE defaultTxErrorHandler
[Q.sql|
SELECT l.id, l.locked IS NOT NULL AND l.locked >= (NOW() - interval '30 minute')
FROM hdb_catalog.event_log l
JOIN hdb_catalog.event_triggers e
ON l.trigger_name = e.name
WHERE l.id = $1
|] (Identity eid) True
event <- getEvent events
assertEventUnlocked event
return event
where
getEvent [] = throw400 NotExists "event not found"
getEvent (x:_) = return x
assertEventUnlocked (_, locked) = when locked $
throw400 Busy "event is already being processed"
markForDelivery :: EventId -> Q.TxE QErr ()
markForDelivery eid =
Q.unitQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.event_log
SET
delivered = 'f',
error = 'f',
tries = 0
WHERE id = $1
|] (Identity eid) True
subTableP1 :: (UserInfoM m, QErrM m, CacheRM m) => CreateEventTriggerQuery -> m (QualifiedTable, Bool, EventTriggerConf)
subTableP1 (CreateEventTriggerQuery name qt insert update delete enableManual retryConf webhook webhookFromEnv mheaders replace) = do
ti <- askTableCoreInfo qt
-- can only replace for same table
when replace $ do
ti' <- _tiCoreInfo <$> askTabInfoFromTrigger name
when (_tciName ti' /= _tciName ti) $ throw400 NotSupported "cannot replace table or schema for trigger"
assertCols ti insert
assertCols ti update
assertCols ti delete
let rconf = fromMaybe defaultRetryConf retryConf
return (qt, replace, EventTriggerConf name (TriggerOpsDef insert update delete enableManual) webhook webhookFromEnv rconf mheaders)
where
assertCols _ Nothing = return ()
assertCols ti (Just sos) = do
let cols = sosColumns sos
case cols of
SubCStar -> return ()
SubCArray pgcols -> forM_ pgcols (assertPGCol (_tciFieldInfoMap ti) "")
mkEventTriggerInfo
:: QErrM m
=> Env.Environment
-> QualifiedTable
-> EventTriggerConf
-> m (EventTriggerInfo, [SchemaDependency])
mkEventTriggerInfo env qt (EventTriggerConf name def webhook webhookFromEnv rconf mheaders) = do
webhookConf <- case (webhook, webhookFromEnv) of
(Just w, Nothing) -> return $ WCValue w
(Nothing, Just wEnv) -> return $ WCEnv wEnv
_ -> throw500 "expected webhook or webhook_from_env"
let headerConfs = fromMaybe [] mheaders
webhookInfo <- getWebhookInfoFromConf env webhookConf
headerInfos <- getHeaderInfosFromConf env headerConfs
let eTrigInfo = EventTriggerInfo name def rconf webhookInfo headerInfos
tabDep = SchemaDependency (SOTable qt) DRParent
pure (eTrigInfo, tabDep:getTrigDefDeps qt def)
getTrigDefDeps :: QualifiedTable -> TriggerOpsDef -> [SchemaDependency]
getTrigDefDeps qt (TriggerOpsDef mIns mUpd mDel _) =
mconcat $ catMaybes [ subsOpSpecDeps <$> mIns
, subsOpSpecDeps <$> mUpd
, subsOpSpecDeps <$> mDel
]
where
subsOpSpecDeps :: SubscribeOpSpec -> [SchemaDependency]
subsOpSpecDeps os =
let cols = getColsFromSub $ sosColumns os
colDeps = flip map cols $ \col ->
SchemaDependency (SOTableObj qt (TOCol col)) DRColumn
payload = maybe [] getColsFromSub (sosPayload os)
payloadDeps = flip map payload $ \col ->
SchemaDependency (SOTableObj qt (TOCol col)) DRPayload
in colDeps <> payloadDeps
getColsFromSub sc = case sc of
SubCStar -> []
SubCArray pgcols -> pgcols
runCreateEventTriggerQuery
:: (QErrM m, UserInfoM m, CacheRWM m, MonadTx m)
=> CreateEventTriggerQuery -> m EncJSON
runCreateEventTriggerQuery q = do
(qt, replace, etc) <- subTableP1 q
liftTx if replace
then updateEventTriggerInCatalog etc
else addEventTriggerToCatalog qt etc
buildSchemaCacheFor $ MOTableObj qt (MTOTrigger $ etcName etc)
return successMsg
runDeleteEventTriggerQuery
:: (MonadTx m, CacheRWM m)
=> DeleteEventTriggerQuery -> m EncJSON
runDeleteEventTriggerQuery (DeleteEventTriggerQuery name) = do
liftTx $ delEventTriggerFromCatalog name
withNewInconsistentObjsCheck buildSchemaCache
pure successMsg
deliverEvent
:: (QErrM m, MonadTx m)
=> RedeliverEventQuery -> m EncJSON
deliverEvent (RedeliverEventQuery eventId) = do
_ <- liftTx $ fetchEvent eventId
liftTx $ markForDelivery eventId
return successMsg
runRedeliverEvent
:: (MonadTx m)
=> RedeliverEventQuery -> m EncJSON
runRedeliverEvent = deliverEvent
insertManualEvent
:: QualifiedTable
-> TriggerName
-> Value
-> Q.TxE QErr EventId
insertManualEvent qt trn rowData = do
let op = T.pack $ show MANUAL
eids <- map runIdentity <$> Q.listQE defaultTxErrorHandler [Q.sql|
SELECT hdb_catalog.insert_event_log($1, $2, $3, $4, $5)
|] (sn, tn, trn, op, Q.AltJ $ toJSON rowData) True
getEid eids
where
QualifiedObject sn tn = qt
getEid [] = throw500 "could not create manual event"
getEid (x:_) = return x
runInvokeEventTrigger
:: (QErrM m, CacheRM m, MonadTx m)
=> InvokeEventTriggerQuery -> m EncJSON
runInvokeEventTrigger (InvokeEventTriggerQuery name payload) = do
trigInfo <- askEventTriggerInfo name
assertManual $ etiOpsDef trigInfo
ti <- askTabInfoFromTrigger name
eid <- liftTx $ insertManualEvent (_tciName $ _tiCoreInfo ti) name payload
return $ encJFromJValue $ object ["event_id" .= eid]
where
assertManual (TriggerOpsDef _ _ _ man) = case man of
Just True -> return ()
_ -> throw400 NotSupported "manual mode is not enabled for event trigger"
getHeaderInfosFromConf
:: QErrM m
=> Env.Environment
-> [HeaderConf]
-> m [EventHeaderInfo]
getHeaderInfosFromConf env = mapM getHeader
where
getHeader :: QErrM m => HeaderConf -> m EventHeaderInfo
getHeader hconf = case hconf of
(HeaderConf _ (HVValue val)) -> return $ EventHeaderInfo hconf val
(HeaderConf _ (HVEnv val)) -> do
envVal <- getEnv env val
return $ EventHeaderInfo hconf envVal
getWebhookInfoFromConf
:: QErrM m
=> Env.Environment
-> WebhookConf
-> m WebhookConfInfo
getWebhookInfoFromConf env wc = case wc of
WCValue w -> do
resolvedWebhook <- resolveWebhook env w
return $ WebhookConfInfo wc $ unResolvedWebhook resolvedWebhook
WCEnv we -> do
envVal <- getEnv env we
return $ WebhookConfInfo wc envVal
getEventTriggerDef
:: TriggerName
-> Q.TxE QErr (QualifiedTable, EventTriggerConf)
getEventTriggerDef triggerName = do
(sn, tn, Q.AltJ etc) <- Q.getRow <$> Q.withQE defaultTxErrorHandler
[Q.sql|
SELECT e.schema_name, e.table_name, e.configuration::json
FROM hdb_catalog.event_triggers e where e.name = $1
|] (Identity triggerName) False
return (QualifiedObject sn tn, etc)
updateEventTriggerInCatalog :: EventTriggerConf -> Q.TxE QErr ()
updateEventTriggerInCatalog trigConf =
Q.unitQE defaultTxErrorHandler
[Q.sql|
UPDATE hdb_catalog.event_triggers
SET
configuration = $1
WHERE name = $2
|] (Q.AltJ $ toJSON trigConf, etcName trigConf) True
-- | Replaces /all/ event triggers in the catalog with new ones, taking care to
-- drop SQL trigger functions and archive events for any deleted event triggers.
--
-- See Note [Diff-and-patch event triggers on replace] for more details.
replaceEventTriggersInCatalog
:: MonadTx m
=> HashMap TriggerName (QualifiedTable, EventTriggerConf)
-> m ()
replaceEventTriggersInCatalog triggerConfs = do
existingTriggers <- Map.fromListOn id <$> fetchExistingTriggers
liftTx $ for_ (align existingTriggers triggerConfs) \case
This triggerName -> delEventTriggerFromCatalog triggerName
That (tableName, triggerConf) -> addEventTriggerToCatalog tableName triggerConf
These _ (_, triggerConf) -> updateEventTriggerInCatalog triggerConf
where
fetchExistingTriggers = liftTx $ map runIdentity <$>
Q.listQE defaultTxErrorHandler
[Q.sql|SELECT name FROM hdb_catalog.event_triggers|] () True
{- Note [Diff-and-patch event triggers on replace]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
When executing a replace_metadata API call, we usually just drop everything in
the catalog and recreate it from scratch, then rebuild the schema cache. This
works fine for most things, but its a bad idea for event triggers, because
delEventTriggerFromCatalog does extra work: it deletes the SQL trigger functions
and archives all associated events.
Therefore, we have to be more careful about which event triggers we drop. We
diff the new metadata against the old metadata, and we only drop triggers that
are actually absent in the new metadata. The replaceEventTriggersInCatalog
function implements this diff-and-patch operation. -}