mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-17 04:24:35 +03:00
8cf134dad1
PR-URL: https://github.com/hasura/graphql-engine-mono/pull/8873 GitOrigin-RevId: 566cb4271f0eb27e6688c2e0fbc26711bdf8baa9
754 lines
29 KiB
Haskell
754 lines
29 KiB
Haskell
{-# LANGUAGE TemplateHaskell #-}
|
|
|
|
module Hasura.RQL.DDL.EventTrigger
|
|
( CreateEventTriggerQuery,
|
|
runCreateEventTriggerQuery,
|
|
DeleteEventTriggerQuery,
|
|
runDeleteEventTriggerQuery,
|
|
dropEventTriggerInMetadata,
|
|
RedeliverEventQuery,
|
|
runRedeliverEvent,
|
|
InvokeEventTriggerQuery,
|
|
runInvokeEventTrigger,
|
|
-- TODO(from master): review
|
|
ResolveHeaderError (..),
|
|
getHeaderInfosFromConf,
|
|
getHeaderInfosFromConfEither,
|
|
getWebhookInfoFromConf,
|
|
buildEventTriggerInfo,
|
|
getSourceTableAndTriggers,
|
|
getTriggerNames,
|
|
getTriggersMap,
|
|
getTableNameFromTrigger,
|
|
cetqSource,
|
|
cetqName,
|
|
cetqTable,
|
|
cetqInsert,
|
|
cetqUpdate,
|
|
cetqDelete,
|
|
cetqEnableManual,
|
|
cetqRetryConf,
|
|
cetqWebhook,
|
|
cetqWebhookFromEnv,
|
|
cetqHeaders,
|
|
cetqReplace,
|
|
cetqRequestTransform,
|
|
cetqResponseTrasnform,
|
|
cteqCleanupConfig,
|
|
cteqTriggerOnReplication,
|
|
runCleanupEventTriggerLog,
|
|
runEventTriggerResumeCleanup,
|
|
runEventTriggerPauseCleanup,
|
|
MonadEventLogCleanup (..),
|
|
getAllEventTriggersWithCleanupConfig,
|
|
getAllETWithCleanupConfigInTableMetadata,
|
|
)
|
|
where
|
|
|
|
import Control.Lens (ifor_, makeLenses, (.~))
|
|
import Data.Aeson
|
|
import Data.Either.Combinators
|
|
import Data.Environment qualified as Env
|
|
import Data.Has (Has)
|
|
import Data.HashMap.Strict qualified as HM
|
|
import Data.HashMap.Strict qualified as Map
|
|
import Data.HashMap.Strict.InsOrd qualified as OMap
|
|
import Data.HashSet qualified as Set
|
|
import Data.Sequence qualified as Seq
|
|
import Data.Text qualified as T
|
|
import Data.Text.Extended
|
|
import Data.URL.Template (printURLTemplate)
|
|
import Hasura.Base.Error
|
|
import Hasura.EncJSON
|
|
import Hasura.Eventing.EventTrigger (logQErr)
|
|
import Hasura.Logging qualified as L
|
|
import Hasura.Prelude
|
|
import Hasura.RQL.DDL.Webhook.Transform (MetadataResponseTransform, RequestTransform)
|
|
import Hasura.RQL.Types.Backend
|
|
import Hasura.RQL.Types.Common
|
|
import Hasura.RQL.Types.EventTrigger
|
|
import Hasura.RQL.Types.Eventing
|
|
import Hasura.RQL.Types.Eventing.Backend
|
|
import Hasura.RQL.Types.Headers (HeaderValue (..))
|
|
import Hasura.RQL.Types.Metadata
|
|
import Hasura.RQL.Types.Metadata.Backend
|
|
import Hasura.RQL.Types.Metadata.Object
|
|
import Hasura.RQL.Types.SchemaCache
|
|
import Hasura.RQL.Types.SchemaCache.Build
|
|
import Hasura.RQL.Types.SchemaCacheTypes
|
|
import Hasura.RQL.Types.Source
|
|
import Hasura.RQL.Types.Table
|
|
import Hasura.SQL.AnyBackend qualified as AB
|
|
import Hasura.SQL.Backend
|
|
import Hasura.Session
|
|
import Hasura.Tracing (TraceT)
|
|
import Hasura.Tracing qualified as Tracing
|
|
|
|
data CreateEventTriggerQuery (b :: BackendType) = CreateEventTriggerQuery
|
|
{ _cetqSource :: SourceName,
|
|
_cetqName :: TriggerName,
|
|
_cetqTable :: TableName b,
|
|
_cetqInsert :: Maybe (SubscribeOpSpec b),
|
|
_cetqUpdate :: Maybe (SubscribeOpSpec b),
|
|
_cetqDelete :: Maybe (SubscribeOpSpec b),
|
|
_cetqEnableManual :: Maybe Bool,
|
|
_cetqRetryConf :: Maybe RetryConf,
|
|
_cetqWebhook :: Maybe InputWebhook,
|
|
_cetqWebhookFromEnv :: Maybe Text,
|
|
_cetqHeaders :: Maybe [HeaderConf],
|
|
_cetqReplace :: Bool,
|
|
_cetqRequestTransform :: Maybe RequestTransform,
|
|
_cetqResponseTrasnform :: Maybe MetadataResponseTransform,
|
|
_cteqCleanupConfig :: Maybe AutoTriggerLogCleanupConfig,
|
|
_cteqTriggerOnReplication :: TriggerOnReplication
|
|
}
|
|
|
|
$(makeLenses ''CreateEventTriggerQuery)
|
|
|
|
instance Backend b => FromJSON (CreateEventTriggerQuery b) where
|
|
parseJSON = withObject "CreateEventTriggerQuery" \o -> do
|
|
sourceName <- o .:? "source" .!= defaultSource
|
|
name <- o .: "name"
|
|
table <- o .: "table"
|
|
insert <- o .:? "insert"
|
|
update <- o .:? "update"
|
|
delete <- o .:? "delete"
|
|
enableManual <- o .:? "enable_manual" .!= False
|
|
retryConf <- o .:? "retry_conf"
|
|
webhook <- o .:? "webhook"
|
|
webhookFromEnv <- o .:? "webhook_from_env"
|
|
headers <- o .:? "headers"
|
|
replace <- o .:? "replace" .!= False
|
|
requestTransform <- o .:? "request_transform"
|
|
responseTransform <- o .:? "response_transform"
|
|
cleanupConfig <- o .:? "cleanup_config"
|
|
when (isIllegalTriggerName name) $
|
|
fail "only alphanumeric and underscore and hyphens allowed for name"
|
|
unless (T.length (triggerNameToTxt name) <= maxTriggerNameLength) $
|
|
fail "event trigger name can be at most 42 characters"
|
|
unless (any isJust [insert, update, delete] || enableManual) $
|
|
fail "atleast one amongst insert/update/delete/enable_manual spec must be provided"
|
|
case (webhook, webhookFromEnv) of
|
|
(Just _, Nothing) -> return ()
|
|
(Nothing, Just _) -> return ()
|
|
(Just _, Just _) -> fail "only one of webhook or webhook_from_env should be given"
|
|
_ -> fail "must provide webhook or webhook_from_env"
|
|
mapM_ checkEmptyCols [insert, update, delete]
|
|
defTOR <- case defaultTriggerOnReplication @b of
|
|
Just (_, dt) -> pure dt
|
|
Nothing -> fail "No default setting for trigger_on_replication is defined for backend type."
|
|
triggerOnReplication <- o .:? "trigger_on_replication" .!= defTOR
|
|
return $ CreateEventTriggerQuery sourceName name table insert update delete (Just enableManual) retryConf webhook webhookFromEnv headers replace requestTransform responseTransform cleanupConfig triggerOnReplication
|
|
where
|
|
checkEmptyCols spec =
|
|
case spec of
|
|
Just (SubscribeOpSpec (SubCArray cols) _) -> when (null cols) (fail "found empty column specification")
|
|
Just (SubscribeOpSpec _ (Just (SubCArray cols))) -> when (null cols) (fail "found empty payload specification")
|
|
_ -> return ()
|
|
|
|
data DeleteEventTriggerQuery (b :: BackendType) = DeleteEventTriggerQuery
|
|
{ _detqSource :: SourceName,
|
|
_detqName :: TriggerName
|
|
}
|
|
|
|
instance FromJSON (DeleteEventTriggerQuery b) where
|
|
parseJSON = withObject "DeleteEventTriggerQuery" $ \o ->
|
|
DeleteEventTriggerQuery
|
|
<$> o .:? "source" .!= defaultSource
|
|
<*> o .: "name"
|
|
|
|
data RedeliverEventQuery (b :: BackendType) = RedeliverEventQuery
|
|
{ _rdeqEventId :: EventId,
|
|
_rdeqSource :: SourceName
|
|
}
|
|
|
|
instance FromJSON (RedeliverEventQuery b) where
|
|
parseJSON = withObject "RedeliverEventQuery" $ \o ->
|
|
RedeliverEventQuery
|
|
<$> o .: "event_id"
|
|
<*> o .:? "source" .!= defaultSource
|
|
|
|
data InvokeEventTriggerQuery (b :: BackendType) = InvokeEventTriggerQuery
|
|
{ _ietqName :: TriggerName,
|
|
_ietqSource :: SourceName,
|
|
_ietqPayload :: Value
|
|
}
|
|
|
|
instance Backend b => FromJSON (InvokeEventTriggerQuery b) where
|
|
parseJSON = withObject "InvokeEventTriggerQuery" $ \o ->
|
|
InvokeEventTriggerQuery
|
|
<$> o .: "name"
|
|
<*> o .:? "source" .!= defaultSource
|
|
<*> o .: "payload"
|
|
|
|
-- | This typeclass have the implementation logic for the event trigger log cleanup.
|
|
--
|
|
-- TODO: this doesn't belong here in the DDL folder, but should be part of
|
|
-- Hasura.Eventing. It could even be made a Service, since the whole point of it
|
|
-- is to implement features differently between OSS and Pro.
|
|
class Monad m => MonadEventLogCleanup m where
|
|
-- Deletes the logs of event triggers
|
|
runLogCleaner ::
|
|
SourceCache -> TriggerLogCleanupConfig -> m (Either QErr EncJSON)
|
|
|
|
-- Generates the cleanup schedules for event triggers which have log cleaners installed
|
|
generateCleanupSchedules ::
|
|
AB.AnyBackend SourceInfo -> TriggerName -> AutoTriggerLogCleanupConfig -> m (Either QErr ())
|
|
|
|
-- | `updateTriggerCleanupSchedules` is primarily used to update the
|
|
-- cleanup schedules associated with an event trigger in case the cleanup
|
|
-- config has changed while replacing the metadata.
|
|
--
|
|
-- In case,
|
|
-- i. a source has been dropped -
|
|
-- We don't need to clear the cleanup schedules
|
|
-- because the event log cleanup table is dropped as part
|
|
-- of the post drop source hook.
|
|
-- ii. a table or an event trigger has been dropped/updated -
|
|
-- Older cleanup events will be deleted first and in case of
|
|
-- an update, new cleanup events will be generated and inserted
|
|
-- into the table.
|
|
-- iii. a new event trigger with cleanup config has been added -
|
|
-- Generate the cleanup events and insert it.
|
|
-- iv. a new source has been added -
|
|
-- Generate the cleanup events and insert it.
|
|
-- v. the cron schedule for event trigger cleanup config has changed -
|
|
-- Delete cleanup events with older cron schedule and generate
|
|
-- cleanup events with new cron schedule.
|
|
updateTriggerCleanupSchedules ::
|
|
L.Logger L.Hasura ->
|
|
InsOrdHashMap SourceName BackendSourceMetadata ->
|
|
InsOrdHashMap SourceName BackendSourceMetadata ->
|
|
SchemaCache ->
|
|
m (Either QErr ())
|
|
|
|
instance (MonadEventLogCleanup m) => MonadEventLogCleanup (ReaderT r m) where
|
|
runLogCleaner sourceCache conf = lift $ runLogCleaner sourceCache conf
|
|
generateCleanupSchedules sourceInfo triggerName cleanupConfig = lift $ generateCleanupSchedules sourceInfo triggerName cleanupConfig
|
|
updateTriggerCleanupSchedules logger oldSources newSources schemaCache = lift $ updateTriggerCleanupSchedules logger oldSources newSources schemaCache
|
|
|
|
instance (MonadEventLogCleanup m) => MonadEventLogCleanup (ExceptT e m) where
|
|
runLogCleaner sourceCache conf = lift $ runLogCleaner sourceCache conf
|
|
generateCleanupSchedules sourceInfo triggerName cleanupConfig = lift $ generateCleanupSchedules sourceInfo triggerName cleanupConfig
|
|
updateTriggerCleanupSchedules logger oldSources newSources schemaCache = lift $ updateTriggerCleanupSchedules logger oldSources newSources schemaCache
|
|
|
|
instance (MonadEventLogCleanup m) => MonadEventLogCleanup (MetadataT m) where
|
|
runLogCleaner sourceCache conf = lift $ runLogCleaner sourceCache conf
|
|
generateCleanupSchedules sourceInfo triggerName cleanupConfig = lift $ generateCleanupSchedules sourceInfo triggerName cleanupConfig
|
|
updateTriggerCleanupSchedules logger oldSources newSources schemaCache = lift $ updateTriggerCleanupSchedules logger oldSources newSources schemaCache
|
|
|
|
instance (MonadEventLogCleanup m) => MonadEventLogCleanup (TraceT m) where
|
|
runLogCleaner sourceCache conf = lift $ runLogCleaner sourceCache conf
|
|
generateCleanupSchedules sourceInfo triggerName cleanupConfig = lift $ generateCleanupSchedules sourceInfo triggerName cleanupConfig
|
|
updateTriggerCleanupSchedules logger oldSources newSources schemaCache = lift $ updateTriggerCleanupSchedules logger oldSources newSources schemaCache
|
|
|
|
instance (MonadEventLogCleanup m) => MonadEventLogCleanup (StateT w m) where
|
|
runLogCleaner sourceCache conf = lift $ runLogCleaner sourceCache conf
|
|
generateCleanupSchedules sourceInfo triggerName cleanupConfig = lift $ generateCleanupSchedules sourceInfo triggerName cleanupConfig
|
|
updateTriggerCleanupSchedules logger oldSources newSources schemaCache = lift $ updateTriggerCleanupSchedules logger oldSources newSources schemaCache
|
|
|
|
resolveEventTriggerQuery ::
|
|
forall b m.
|
|
(Backend b, UserInfoM m, QErrM m, CacheRM m) =>
|
|
CreateEventTriggerQuery b ->
|
|
m (Bool, EventTriggerConf b)
|
|
resolveEventTriggerQuery (CreateEventTriggerQuery source name qt insert update delete enableManual retryConf webhook webhookFromEnv mheaders replace reqTransform respTransform cleanupConfig triggerOnReplication) = do
|
|
ti <- askTableCoreInfo source qt
|
|
-- can only replace for same table
|
|
when replace $ do
|
|
ti' <- _tiCoreInfo <$> askTabInfoFromTrigger @b source name
|
|
when (_tciName ti' /= _tciName ti) $ throw400 NotSupported "cannot replace table or schema for trigger"
|
|
|
|
assertCols ti insert
|
|
assertCols ti update
|
|
assertCols ti delete
|
|
|
|
let rconf = fromMaybe defaultRetryConf retryConf
|
|
return (replace, EventTriggerConf name (TriggerOpsDef insert update delete enableManual) webhook webhookFromEnv rconf mheaders reqTransform respTransform cleanupConfig triggerOnReplication)
|
|
where
|
|
assertCols :: TableCoreInfo b -> Maybe (SubscribeOpSpec b) -> m ()
|
|
assertCols ti opSpec = for_ opSpec \sos -> case sosColumns sos of
|
|
SubCStar -> return ()
|
|
SubCArray columns -> forM_ columns (assertColumnExists @b (_tciFieldInfoMap ti) "")
|
|
|
|
droppedTriggerOps :: TriggerOpsDef b -> TriggerOpsDef b -> HashSet Ops
|
|
droppedTriggerOps oldEventTriggerOps newEventTriggerOps =
|
|
Set.fromList $
|
|
catMaybes $
|
|
[ (bool Nothing (Just INSERT) (isDroppedOp (tdInsert oldEventTriggerOps) (tdInsert newEventTriggerOps))),
|
|
(bool Nothing (Just UPDATE) (isDroppedOp (tdUpdate oldEventTriggerOps) (tdUpdate newEventTriggerOps))),
|
|
(bool Nothing (Just DELETE) (isDroppedOp (tdDelete oldEventTriggerOps) (tdDelete newEventTriggerOps)))
|
|
]
|
|
where
|
|
isDroppedOp old new = isJust old && isNothing new
|
|
|
|
createEventTriggerQueryMetadata ::
|
|
forall b m r.
|
|
( BackendMetadata b,
|
|
QErrM m,
|
|
UserInfoM m,
|
|
CacheRWM m,
|
|
MetadataM m,
|
|
BackendEventTrigger b,
|
|
MonadIO m,
|
|
MonadEventLogCleanup m,
|
|
MonadReader r m,
|
|
Has (L.Logger L.Hasura) r
|
|
) =>
|
|
CreateEventTriggerQuery b ->
|
|
m ()
|
|
createEventTriggerQueryMetadata q = do
|
|
(replace, triggerConf) <- resolveEventTriggerQuery q
|
|
let table = _cetqTable q
|
|
source = _cetqSource q
|
|
triggerName = etcName triggerConf
|
|
metadataObj =
|
|
MOSourceObjId source $
|
|
AB.mkAnyBackend $
|
|
SMOTableObj @b table $
|
|
MTOTrigger triggerName
|
|
sourceInfo <- askSourceInfo @b source
|
|
let sourceConfig = (_siConfiguration sourceInfo)
|
|
newConfig = _cteqCleanupConfig q
|
|
|
|
-- Check for existence of a trigger with 'triggerName' only when 'replace' is not set
|
|
if replace
|
|
then do
|
|
existingEventTriggerOps <- etiOpsDef <$> askEventTriggerInfo @b source triggerName
|
|
let droppedOps = droppedTriggerOps existingEventTriggerOps (etcDefinition triggerConf)
|
|
dropDanglingSQLTrigger @b (_siConfiguration sourceInfo) triggerName table droppedOps
|
|
|
|
-- check if cron schedule for the cleanup config has changed then delete the scheduled cleanups
|
|
oldConfig <- etiCleanupConfig <$> askEventTriggerInfo @b source triggerName
|
|
when (hasCleanupCronScheduleUpdated oldConfig newConfig) do
|
|
deleteAllScheduledCleanups @b sourceConfig triggerName
|
|
for_ newConfig \cleanupConfig -> do
|
|
(`onLeft` logQErr) =<< generateCleanupSchedules (AB.mkAnyBackend sourceInfo) triggerName cleanupConfig
|
|
else do
|
|
doesTriggerExists <- checkIfTriggerExists @b sourceConfig triggerName (Set.fromList [INSERT, UPDATE, DELETE])
|
|
if doesTriggerExists
|
|
then throw400 AlreadyExists ("Event trigger with name " <> triggerNameToTxt triggerName <<> " already exists")
|
|
else for_ newConfig \cleanupConfig -> do
|
|
(`onLeft` logQErr) =<< generateCleanupSchedules (AB.mkAnyBackend sourceInfo) triggerName cleanupConfig
|
|
|
|
buildSchemaCacheFor metadataObj $
|
|
MetadataModifier $
|
|
tableMetadataSetter @b source table . tmEventTriggers
|
|
%~ if replace
|
|
then ix triggerName .~ triggerConf
|
|
else OMap.insert triggerName triggerConf
|
|
|
|
runCreateEventTriggerQuery ::
|
|
forall b m r.
|
|
( BackendMetadata b,
|
|
BackendEventTrigger b,
|
|
QErrM m,
|
|
UserInfoM m,
|
|
CacheRWM m,
|
|
MetadataM m,
|
|
MonadIO m,
|
|
MonadEventLogCleanup m,
|
|
MonadReader r m,
|
|
Has (L.Logger L.Hasura) r
|
|
) =>
|
|
CreateEventTriggerQuery b ->
|
|
m EncJSON
|
|
runCreateEventTriggerQuery q = do
|
|
createEventTriggerQueryMetadata @b q
|
|
pure successMsg
|
|
|
|
runDeleteEventTriggerQuery ::
|
|
forall b m.
|
|
(BackendEventTrigger b, MonadError QErr m, CacheRWM m, MonadIO m, MetadataM m) =>
|
|
DeleteEventTriggerQuery b ->
|
|
m EncJSON
|
|
runDeleteEventTriggerQuery (DeleteEventTriggerQuery sourceName triggerName) = do
|
|
sourceConfig <- askSourceConfig @b sourceName
|
|
tableName <- (_tciName . _tiCoreInfo) <$> askTabInfoFromTrigger @b sourceName triggerName
|
|
|
|
withNewInconsistentObjsCheck $
|
|
buildSchemaCache $
|
|
MetadataModifier $
|
|
tableMetadataSetter @b sourceName tableName %~ dropEventTriggerInMetadata triggerName
|
|
|
|
dropTriggerAndArchiveEvents @b sourceConfig triggerName tableName
|
|
|
|
deleteAllScheduledCleanups @b sourceConfig triggerName
|
|
|
|
pure successMsg
|
|
|
|
runRedeliverEvent ::
|
|
forall b m.
|
|
(BackendEventTrigger b, MonadIO m, CacheRM m, QErrM m, MetadataM m) =>
|
|
RedeliverEventQuery b ->
|
|
m EncJSON
|
|
runRedeliverEvent (RedeliverEventQuery eventId source) = do
|
|
sourceConfig <- askSourceConfig @b source
|
|
redeliverEvent @b sourceConfig eventId
|
|
pure successMsg
|
|
|
|
runInvokeEventTrigger ::
|
|
forall b m.
|
|
( MonadIO m,
|
|
QErrM m,
|
|
CacheRM m,
|
|
MetadataM m,
|
|
Tracing.MonadTrace m,
|
|
UserInfoM m,
|
|
BackendEventTrigger b
|
|
) =>
|
|
InvokeEventTriggerQuery b ->
|
|
m EncJSON
|
|
runInvokeEventTrigger (InvokeEventTriggerQuery name source payload) = do
|
|
trigInfo <- askEventTriggerInfo @b source name
|
|
assertManual $ etiOpsDef trigInfo
|
|
ti <- askTabInfoFromTrigger source name
|
|
sourceConfig <- askSourceConfig @b source
|
|
traceCtx <- Tracing.currentContext
|
|
userInfo <- askUserInfo
|
|
eid <- insertManualEvent @b sourceConfig (tableInfoName @b ti) name (makePayload payload) userInfo traceCtx
|
|
return $ encJFromJValue $ object ["event_id" .= eid]
|
|
where
|
|
makePayload o = object ["old" .= Null, "new" .= o]
|
|
|
|
assertManual (TriggerOpsDef _ _ _ man) = case man of
|
|
Just True -> return ()
|
|
_ -> throw400 NotSupported "manual mode is not enabled for event trigger"
|
|
|
|
askTabInfoFromTrigger ::
|
|
(Backend b, QErrM m, CacheRM m) =>
|
|
SourceName ->
|
|
TriggerName ->
|
|
m (TableInfo b)
|
|
askTabInfoFromTrigger sourceName triggerName = do
|
|
schemaCache <- askSchemaCache
|
|
getTabInfoFromSchemaCache schemaCache sourceName triggerName
|
|
`onNothing` throw400 NotExists errMsg
|
|
where
|
|
errMsg = "event trigger " <> triggerName <<> " does not exist"
|
|
|
|
getTabInfoFromSchemaCache ::
|
|
Backend b =>
|
|
SchemaCache ->
|
|
SourceName ->
|
|
TriggerName ->
|
|
Maybe (TableInfo b)
|
|
getTabInfoFromSchemaCache schemaCache sourceName triggerName = do
|
|
tableCache <- unsafeTableCache sourceName $ scSources schemaCache
|
|
find (isJust . HM.lookup triggerName . _tiEventTriggerInfoMap) (HM.elems tableCache)
|
|
|
|
askEventTriggerInfo ::
|
|
forall b m.
|
|
(QErrM m, CacheRM m, Backend b) =>
|
|
SourceName ->
|
|
TriggerName ->
|
|
m (EventTriggerInfo b)
|
|
askEventTriggerInfo sourceName triggerName = do
|
|
triggerInfo <- askTabInfoFromTrigger @b sourceName triggerName
|
|
let eventTriggerInfoMap = _tiEventTriggerInfoMap triggerInfo
|
|
HM.lookup triggerName eventTriggerInfoMap `onNothing` throw400 NotExists errMsg
|
|
where
|
|
errMsg = "event trigger " <> triggerName <<> " does not exist"
|
|
|
|
-- This change helps us create functions for the event triggers
|
|
-- without the function name being truncated by PG, since PG allows
|
|
-- for only 63 chars for identifiers.
|
|
-- Reasoning for the 42 characters:
|
|
-- 63 - (notify_hasura_) - (_INSERT | _UPDATE | _DELETE)
|
|
maxTriggerNameLength :: Int
|
|
maxTriggerNameLength = 42
|
|
|
|
-- Consists of a list of environment variables with invalid/missing values
|
|
newtype ResolveHeaderError = ResolveHeaderError {unResolveHeaderError :: [Text]} deriving (Show)
|
|
|
|
instance ToTxt ResolveHeaderError where
|
|
toTxt = commaSeparated . unResolveHeaderError
|
|
|
|
getHeaderInfosFromConf ::
|
|
QErrM m =>
|
|
Env.Environment ->
|
|
[HeaderConf] ->
|
|
m [EventHeaderInfo]
|
|
getHeaderInfosFromConf env = mapM getHeader
|
|
where
|
|
getHeader :: QErrM m => HeaderConf -> m EventHeaderInfo
|
|
getHeader hconf = case hconf of
|
|
(HeaderConf _ (HVValue val)) -> return $ EventHeaderInfo hconf val
|
|
(HeaderConf _ (HVEnv val)) -> do
|
|
envVal <- getEnv env val
|
|
return $ EventHeaderInfo hconf envVal
|
|
|
|
-- This is similar to `getHeaderInfosFromConf` but it doesn't fail when an env var is invalid
|
|
getHeaderInfosFromConfEither ::
|
|
Env.Environment ->
|
|
[HeaderConf] ->
|
|
Either ResolveHeaderError [EventHeaderInfo]
|
|
getHeaderInfosFromConfEither env hConfList =
|
|
if isHeaderError
|
|
then Left (ResolveHeaderError $ lefts headerInfoList)
|
|
else Right (rights headerInfoList)
|
|
where
|
|
isHeaderError = any isLeft headerInfoList
|
|
headerInfoList = map getHeader hConfList
|
|
getHeader :: HeaderConf -> Either Text EventHeaderInfo
|
|
getHeader hconf = case hconf of
|
|
(HeaderConf _ (HVValue val)) -> Right $ EventHeaderInfo hconf val
|
|
(HeaderConf _ (HVEnv val)) ->
|
|
(Right . EventHeaderInfo hconf) =<< getEnvEither env val
|
|
|
|
getWebhookInfoFromConf ::
|
|
QErrM m =>
|
|
Env.Environment ->
|
|
WebhookConf ->
|
|
m WebhookConfInfo
|
|
getWebhookInfoFromConf env webhookConf = case webhookConf of
|
|
WCValue w -> do
|
|
resolvedWebhook <- resolveWebhook env w
|
|
let urlTemplate = printURLTemplate $ unInputWebhook w
|
|
-- `urlTemplate` can either be the template value({{TEST}}) or a plain text.
|
|
-- When `urlTemplate` is a template value then '_envVarName' of the 'EnvRecord'
|
|
-- will be the template value i.e '{{TEST}}'
|
|
-- When `urlTemplate` is a plain text then '_envVarName' of the 'EnvRecord' will be the plain text value.
|
|
return $ WebhookConfInfo webhookConf (EnvRecord urlTemplate resolvedWebhook)
|
|
WCEnv webhookEnvVar -> do
|
|
envVal <- getEnv env webhookEnvVar
|
|
return $ WebhookConfInfo webhookConf (EnvRecord webhookEnvVar (ResolvedWebhook envVal))
|
|
|
|
buildEventTriggerInfo ::
|
|
forall b m.
|
|
(Backend b, QErrM m) =>
|
|
Env.Environment ->
|
|
SourceName ->
|
|
TableName b ->
|
|
EventTriggerConf b ->
|
|
m (EventTriggerInfo b, Seq SchemaDependency)
|
|
buildEventTriggerInfo
|
|
env
|
|
source
|
|
tableName
|
|
( EventTriggerConf
|
|
name
|
|
def
|
|
webhook
|
|
webhookFromEnv
|
|
rconf
|
|
mheaders
|
|
reqTransform
|
|
respTransform
|
|
cleanupConfig
|
|
triggerOnReplication
|
|
) = do
|
|
webhookConf <- case (webhook, webhookFromEnv) of
|
|
(Just w, Nothing) -> return $ WCValue w
|
|
(Nothing, Just wEnv) -> return $ WCEnv wEnv
|
|
_ -> throw500 "expected webhook or webhook_from_env"
|
|
let headerConfs = fromMaybe [] mheaders
|
|
webhookInfo <- getWebhookInfoFromConf env webhookConf
|
|
headerInfos <- getHeaderInfosFromConf env headerConfs
|
|
let eTrigInfo =
|
|
EventTriggerInfo
|
|
name
|
|
def
|
|
rconf
|
|
webhookInfo
|
|
headerInfos
|
|
reqTransform
|
|
respTransform
|
|
cleanupConfig
|
|
triggerOnReplication
|
|
tabDep =
|
|
SchemaDependency
|
|
( SOSourceObj source $
|
|
AB.mkAnyBackend $
|
|
SOITable @b tableName
|
|
)
|
|
DRParent
|
|
pure (eTrigInfo, tabDep Seq.:<| getTrigDefDeps @b source tableName def)
|
|
|
|
getTrigDefDeps ::
|
|
forall b.
|
|
Backend b =>
|
|
SourceName ->
|
|
TableName b ->
|
|
TriggerOpsDef b ->
|
|
Seq SchemaDependency
|
|
getTrigDefDeps source tableName (TriggerOpsDef mIns mUpd mDel _) =
|
|
mconcat $
|
|
Seq.fromList
|
|
<$> catMaybes
|
|
[ subsOpSpecDeps <$> mIns,
|
|
subsOpSpecDeps <$> mUpd,
|
|
subsOpSpecDeps <$> mDel
|
|
]
|
|
where
|
|
subsOpSpecDeps :: SubscribeOpSpec b -> [SchemaDependency]
|
|
subsOpSpecDeps os =
|
|
let cols = getColsFromSub $ sosColumns os
|
|
mkColDependency dependencyReason col =
|
|
SchemaDependency
|
|
( SOSourceObj source $
|
|
AB.mkAnyBackend $
|
|
SOITableObj @b tableName (TOCol @b col)
|
|
)
|
|
dependencyReason
|
|
colDeps = map (mkColDependency DRColumn) cols
|
|
payload = maybe [] getColsFromSub (sosPayload os)
|
|
payloadDeps = map (mkColDependency DRPayload) payload
|
|
in colDeps <> payloadDeps
|
|
getColsFromSub sc = case sc of
|
|
SubCStar -> []
|
|
SubCArray cols -> cols
|
|
|
|
getTriggersMap ::
|
|
SourceMetadata b ->
|
|
InsOrdHashMap TriggerName (EventTriggerConf b)
|
|
getTriggersMap = OMap.unions . map _tmEventTriggers . OMap.elems . _smTables
|
|
|
|
getSourceTableAndTriggers ::
|
|
SourceMetadata b ->
|
|
[(TableName b, TriggerName)]
|
|
getSourceTableAndTriggers =
|
|
(concatMap mkKeyValue) . OMap.toList . _smTables
|
|
where
|
|
mkKeyValue (tableName, tableMetadata) = map (tableName,) $ OMap.keys (_tmEventTriggers tableMetadata)
|
|
|
|
getTriggerNames ::
|
|
SourceMetadata b ->
|
|
Set.HashSet TriggerName
|
|
getTriggerNames = Set.fromList . OMap.keys . getTriggersMap
|
|
|
|
getTableNameFromTrigger ::
|
|
forall b.
|
|
Backend b =>
|
|
SchemaCache ->
|
|
SourceName ->
|
|
TriggerName ->
|
|
Maybe (TableName b)
|
|
getTableNameFromTrigger schemaCache sourceName triggerName = do
|
|
tableInfo <- getTabInfoFromSchemaCache @b schemaCache sourceName triggerName
|
|
pure $ _tciName $ _tiCoreInfo tableInfo
|
|
|
|
runCleanupEventTriggerLog ::
|
|
(MonadEventLogCleanup m, MonadError QErr m, CacheRWM m) =>
|
|
TriggerLogCleanupConfig ->
|
|
m EncJSON
|
|
runCleanupEventTriggerLog conf = do
|
|
sourceCache <- scSources <$> askSchemaCache
|
|
runLogCleaner sourceCache conf `onLeftM` throwError
|
|
|
|
-- | Updates the cleanup switch in metadata given the source, table and trigger name
|
|
-- The Bool value represents the status of the cleaner, whether to start or pause it
|
|
updateCleanupStatusInMetadata ::
|
|
forall b m.
|
|
(Backend b, QErrM m, CacheRWM m, MetadataM m) =>
|
|
AutoTriggerLogCleanupConfig ->
|
|
EventTriggerCleanupStatus ->
|
|
SourceName ->
|
|
TableName b ->
|
|
TriggerName ->
|
|
m ()
|
|
updateCleanupStatusInMetadata cleanupConfig cleanupSwitch sourceName tableName triggerName = do
|
|
let newCleanupConfig = Just $ cleanupConfig {_atlccPaused = cleanupSwitch}
|
|
metadataObj =
|
|
MOSourceObjId sourceName $
|
|
AB.mkAnyBackend $
|
|
SMOTableObj @b tableName $
|
|
MTOTrigger triggerName
|
|
|
|
buildSchemaCacheFor metadataObj $
|
|
MetadataModifier $
|
|
tableMetadataSetter @b sourceName tableName . tmEventTriggers . ix triggerName %~ updateCleanupConfig newCleanupConfig
|
|
|
|
-- | Function to start/stop the cleanup action based on the event triggers supplied in
|
|
-- TriggerLogCleanupToggleConfig conf
|
|
toggleEventTriggerCleanupAction ::
|
|
forall m.
|
|
(MonadIO m, QErrM m, CacheRWM m, MetadataM m) =>
|
|
TriggerLogCleanupToggleConfig ->
|
|
EventTriggerCleanupStatus ->
|
|
m EncJSON
|
|
toggleEventTriggerCleanupAction conf cleanupSwitch = do
|
|
schemaCache <- askSchemaCache
|
|
case conf of
|
|
TriggerLogCleanupSources tlcs -> do
|
|
case tlcs of
|
|
TriggerAllSource -> do
|
|
ifor_ (scSources schemaCache) $ \sourceName backendSourceInfo -> do
|
|
AB.dispatchAnyBackend @BackendEventTrigger backendSourceInfo \(SourceInfo {..} :: SourceInfo b) -> do
|
|
traverseTableHelper _siTables cleanupSwitch sourceName
|
|
TriggerSource sourceNameLst -> do
|
|
forM_ sourceNameLst $ \sourceName -> do
|
|
backendSourceInfo <-
|
|
HM.lookup sourceName (scSources schemaCache)
|
|
`onNothing` throw400 NotExists ("source with name " <> sourceNameToText sourceName <> " does not exists")
|
|
|
|
AB.dispatchAnyBackend @BackendEventTrigger backendSourceInfo \(SourceInfo {..} :: SourceInfo b) -> do
|
|
traverseTableHelper _siTables cleanupSwitch sourceName
|
|
TriggerQualifier qualifierLst -> do
|
|
forM_ qualifierLst $ \qualifier -> do
|
|
let sourceName = _etqSourceName qualifier
|
|
triggerNames = _etqEventTriggers qualifier
|
|
|
|
backendSourceInfo <-
|
|
HM.lookup sourceName (scSources schemaCache)
|
|
`onNothing` throw400 NotExists ("source with name " <> sourceNameToText sourceName <> " does not exists")
|
|
|
|
AB.dispatchAnyBackend @BackendEventTrigger backendSourceInfo \(SourceInfo {} :: SourceInfo b) -> do
|
|
for_ triggerNames $ \triggerName -> do
|
|
eventTriggerInfo <- askEventTriggerInfo @b sourceName triggerName
|
|
tableName <-
|
|
getTableNameFromTrigger @b schemaCache sourceName triggerName
|
|
`onNothing` throw400 NotExists ("event trigger " <> triggerName <<> " does not exist")
|
|
cleanupConfig <-
|
|
etiCleanupConfig eventTriggerInfo
|
|
`onNothing` throw400 NotExists ("cleanup config does not exist for " <> triggerNameToTxt triggerName)
|
|
updateCleanupStatusInMetadata @b cleanupConfig cleanupSwitch sourceName tableName triggerName
|
|
pure successMsg
|
|
where
|
|
traverseTableHelper ::
|
|
forall b.
|
|
(Backend b) =>
|
|
TableCache b ->
|
|
EventTriggerCleanupStatus ->
|
|
SourceName ->
|
|
m ()
|
|
traverseTableHelper tableCache switch sourceName = forM_ tableCache $ \tableInfo -> do
|
|
let tableName = (_tciName . _tiCoreInfo) tableInfo
|
|
eventTriggerInfoMap = _tiEventTriggerInfoMap tableInfo
|
|
ifor_ eventTriggerInfoMap $ \triggerName eventTriggerInfo -> do
|
|
for_ (etiCleanupConfig eventTriggerInfo) $ \cleanupConfig ->
|
|
updateCleanupStatusInMetadata @b cleanupConfig switch sourceName tableName triggerName
|
|
|
|
runEventTriggerResumeCleanup ::
|
|
forall m.
|
|
(MonadIO m, QErrM m, CacheRWM m, MetadataM m) =>
|
|
TriggerLogCleanupToggleConfig ->
|
|
m EncJSON
|
|
runEventTriggerResumeCleanup conf = toggleEventTriggerCleanupAction conf ETCSUnpaused
|
|
|
|
runEventTriggerPauseCleanup ::
|
|
(MonadError QErr m, CacheRWM m, MonadIO m, MetadataM m) =>
|
|
TriggerLogCleanupToggleConfig ->
|
|
m EncJSON
|
|
runEventTriggerPauseCleanup conf = toggleEventTriggerCleanupAction conf ETCSPaused
|
|
|
|
-- | Collects and returns all the event triggers with cleanup config
|
|
getAllEventTriggersWithCleanupConfig :: TableInfo b -> [(TriggerName, AutoTriggerLogCleanupConfig)]
|
|
getAllEventTriggersWithCleanupConfig tInfo = mapMaybe (\(triggerName, triggerInfo) -> (triggerName,) <$> etiCleanupConfig triggerInfo) $ Map.toList $ _tiEventTriggerInfoMap tInfo
|
|
|
|
hasCleanupCronScheduleUpdated :: Maybe AutoTriggerLogCleanupConfig -> Maybe AutoTriggerLogCleanupConfig -> Bool
|
|
hasCleanupCronScheduleUpdated Nothing _ = False
|
|
hasCleanupCronScheduleUpdated _ Nothing = True
|
|
hasCleanupCronScheduleUpdated (Just oldConfig) (Just newConfig) =
|
|
_atlccSchedule oldConfig /= _atlccSchedule newConfig
|
|
|
|
getAllETWithCleanupConfigInTableMetadata :: TableMetadata b -> [(TriggerName, AutoTriggerLogCleanupConfig)]
|
|
getAllETWithCleanupConfigInTableMetadata tMetadata =
|
|
mapMaybe
|
|
( \(triggerName, triggerConf) ->
|
|
(triggerName,)
|
|
<$> etcCleanupConfig triggerConf
|
|
)
|
|
$ OMap.toList
|
|
$ _tmEventTriggers tMetadata
|