mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-19 05:21:47 +03:00
f4fa960ec6
Hooks up event trigger codecs from #7237. This required fixing a problem where some backend types implemented `defaultTriggerOnReplication` with `error` which caused the server to crash when evaluating those for default values in codecs. The changes here add a type family to `Backend` called `XEventTriggers` that signals backend support for event triggers, and changes the type of `defaultTriggerOnReplication` to from `TriggerOnReplication` to `Maybe (XEventTriggers b, TriggerOnReplication)` so that it can only be implemented with a `Just` value if `XEventTriggers b` is inhabited. This emulates some existing type families in `Backend`. (Thanks to @daniel-chambers for this suggestion!) I used the implementation of `defaultTriggerOnReplication` as a signal for event triggers support to prune the Metadata API so that event trigger fields will not appear in the OpenAPI spec for backend types that do not support event triggers. The codec version of the API will also not emit or accept those fields for those backend types. I think I could use `Typeable` to test whether `XEventTriggers` is `Void` instead of testing whether `defaultTriggerOnReplication` is `Nothing`. But the codec implementation will crash anyway if `defaultTriggerOnReplication` is `Nothing`. I checked to make sure that graphql-engine-pro still compiles. Ticket: https://hasurahq.atlassian.net/browse/GDC-521 PR-URL: https://github.com/hasura/graphql-engine-mono/pull/7295 GitOrigin-RevId: 2b2dd44291513266107ca25cf330319bf53a8b66
686 lines
26 KiB
Haskell
686 lines
26 KiB
Haskell
{-# LANGUAGE TemplateHaskell #-}
|
|
|
|
module Hasura.RQL.DDL.EventTrigger
|
|
( CreateEventTriggerQuery,
|
|
runCreateEventTriggerQuery,
|
|
DeleteEventTriggerQuery,
|
|
runDeleteEventTriggerQuery,
|
|
dropEventTriggerInMetadata,
|
|
RedeliverEventQuery,
|
|
runRedeliverEvent,
|
|
InvokeEventTriggerQuery,
|
|
runInvokeEventTrigger,
|
|
-- TODO(from master): review
|
|
getHeaderInfosFromConf,
|
|
getWebhookInfoFromConf,
|
|
buildEventTriggerInfo,
|
|
getTriggerNames,
|
|
getTriggersMap,
|
|
getTableNameFromTrigger,
|
|
getTabInfoFromSchemaCache,
|
|
cetqSource,
|
|
cetqName,
|
|
cetqTable,
|
|
cetqInsert,
|
|
cetqUpdate,
|
|
cetqDelete,
|
|
cetqEnableManual,
|
|
cetqRetryConf,
|
|
cetqWebhook,
|
|
cetqWebhookFromEnv,
|
|
cetqHeaders,
|
|
cetqReplace,
|
|
cetqRequestTransform,
|
|
cetqResponseTrasnform,
|
|
cteqCleanupConfig,
|
|
cteqTriggerOnReplication,
|
|
runCleanupEventTriggerLog,
|
|
runEventTriggerResumeCleanup,
|
|
runEventTriggerPauseCleanup,
|
|
MonadEventLogCleanup (..),
|
|
getAllEventTriggersWithCleanupConfig,
|
|
getAllETWithCleanupConfigInTableMetadata,
|
|
)
|
|
where
|
|
|
|
import Control.Lens (ifor_, makeLenses, (.~))
|
|
import Data.Aeson
|
|
import Data.ByteString.Lazy qualified as LBS
|
|
import Data.Environment qualified as Env
|
|
import Data.Has (Has)
|
|
import Data.HashMap.Strict qualified as HM
|
|
import Data.HashMap.Strict qualified as Map
|
|
import Data.HashMap.Strict.InsOrd qualified as OMap
|
|
import Data.HashSet qualified as Set
|
|
import Data.Sequence qualified as Seq
|
|
import Data.Text qualified as T
|
|
import Data.Text.Extended
|
|
import Data.URL.Template (printURLTemplate)
|
|
import Hasura.Base.Error
|
|
import Hasura.EncJSON
|
|
import Hasura.Eventing.EventTrigger (logQErr)
|
|
import Hasura.Logging qualified as L
|
|
import Hasura.Metadata.Class (MetadataStorageT)
|
|
import Hasura.Prelude
|
|
import Hasura.RQL.DDL.Headers
|
|
import Hasura.RQL.DDL.Webhook.Transform (MetadataResponseTransform, RequestTransform)
|
|
import Hasura.RQL.Types.Backend
|
|
import Hasura.RQL.Types.Common
|
|
import Hasura.RQL.Types.EventTrigger
|
|
import Hasura.RQL.Types.Eventing
|
|
import Hasura.RQL.Types.Eventing.Backend
|
|
import Hasura.RQL.Types.Metadata
|
|
import Hasura.RQL.Types.Metadata.Backend
|
|
import Hasura.RQL.Types.Metadata.Object
|
|
import Hasura.RQL.Types.SchemaCache
|
|
import Hasura.RQL.Types.SchemaCache.Build
|
|
import Hasura.RQL.Types.SchemaCacheTypes
|
|
import Hasura.RQL.Types.Source
|
|
import Hasura.RQL.Types.Table
|
|
import Hasura.SQL.AnyBackend qualified as AB
|
|
import Hasura.SQL.Backend
|
|
import Hasura.Session
|
|
import Hasura.Tracing (TraceT)
|
|
import Hasura.Tracing qualified as Tracing
|
|
import Text.Regex.TDFA qualified as TDFA
|
|
|
|
data CreateEventTriggerQuery (b :: BackendType) = CreateEventTriggerQuery
|
|
{ _cetqSource :: SourceName,
|
|
_cetqName :: TriggerName,
|
|
_cetqTable :: TableName b,
|
|
_cetqInsert :: Maybe (SubscribeOpSpec b),
|
|
_cetqUpdate :: Maybe (SubscribeOpSpec b),
|
|
_cetqDelete :: Maybe (SubscribeOpSpec b),
|
|
_cetqEnableManual :: Maybe Bool,
|
|
_cetqRetryConf :: Maybe RetryConf,
|
|
_cetqWebhook :: Maybe InputWebhook,
|
|
_cetqWebhookFromEnv :: Maybe Text,
|
|
_cetqHeaders :: Maybe [HeaderConf],
|
|
_cetqReplace :: Bool,
|
|
_cetqRequestTransform :: Maybe RequestTransform,
|
|
_cetqResponseTrasnform :: Maybe MetadataResponseTransform,
|
|
_cteqCleanupConfig :: Maybe AutoTriggerLogCleanupConfig,
|
|
_cteqTriggerOnReplication :: TriggerOnReplication
|
|
}
|
|
|
|
$(makeLenses ''CreateEventTriggerQuery)
|
|
|
|
instance Backend b => FromJSON (CreateEventTriggerQuery b) where
|
|
parseJSON = withObject "CreateEventTriggerQuery" \o -> do
|
|
sourceName <- o .:? "source" .!= defaultSource
|
|
name <- o .: "name"
|
|
table <- o .: "table"
|
|
insert <- o .:? "insert"
|
|
update <- o .:? "update"
|
|
delete <- o .:? "delete"
|
|
enableManual <- o .:? "enable_manual" .!= False
|
|
retryConf <- o .:? "retry_conf"
|
|
webhook <- o .:? "webhook"
|
|
webhookFromEnv <- o .:? "webhook_from_env"
|
|
headers <- o .:? "headers"
|
|
replace <- o .:? "replace" .!= False
|
|
requestTransform <- o .:? "request_transform"
|
|
responseTransform <- o .:? "response_transform"
|
|
cleanupConfig <- o .:? "cleanup_config"
|
|
let regex = "^[A-Za-z]+[A-Za-z0-9_\\-]*$" :: LBS.ByteString
|
|
compiledRegex = TDFA.makeRegex regex :: TDFA.Regex
|
|
isMatch = TDFA.match compiledRegex . T.unpack $ triggerNameToTxt name
|
|
unless isMatch $
|
|
fail "only alphanumeric and underscore and hyphens allowed for name"
|
|
unless (T.length (triggerNameToTxt name) <= maxTriggerNameLength) $
|
|
fail "event trigger name can be at most 42 characters"
|
|
unless (any isJust [insert, update, delete] || enableManual) $
|
|
fail "atleast one amongst insert/update/delete/enable_manual spec must be provided"
|
|
case (webhook, webhookFromEnv) of
|
|
(Just _, Nothing) -> return ()
|
|
(Nothing, Just _) -> return ()
|
|
(Just _, Just _) -> fail "only one of webhook or webhook_from_env should be given"
|
|
_ -> fail "must provide webhook or webhook_from_env"
|
|
mapM_ checkEmptyCols [insert, update, delete]
|
|
defTOR <- case defaultTriggerOnReplication @b of
|
|
Just (_, dt) -> pure dt
|
|
Nothing -> fail "No default setting for trigger_on_replication is defined for backend type."
|
|
triggerOnReplication <- o .:? "trigger_on_replication" .!= defTOR
|
|
return $ CreateEventTriggerQuery sourceName name table insert update delete (Just enableManual) retryConf webhook webhookFromEnv headers replace requestTransform responseTransform cleanupConfig triggerOnReplication
|
|
where
|
|
checkEmptyCols spec =
|
|
case spec of
|
|
Just (SubscribeOpSpec (SubCArray cols) _) -> when (null cols) (fail "found empty column specification")
|
|
Just (SubscribeOpSpec _ (Just (SubCArray cols))) -> when (null cols) (fail "found empty payload specification")
|
|
_ -> return ()
|
|
|
|
data DeleteEventTriggerQuery (b :: BackendType) = DeleteEventTriggerQuery
|
|
{ _detqSource :: SourceName,
|
|
_detqName :: TriggerName
|
|
}
|
|
|
|
instance FromJSON (DeleteEventTriggerQuery b) where
|
|
parseJSON = withObject "DeleteEventTriggerQuery" $ \o ->
|
|
DeleteEventTriggerQuery
|
|
<$> o .:? "source" .!= defaultSource
|
|
<*> o .: "name"
|
|
|
|
data RedeliverEventQuery (b :: BackendType) = RedeliverEventQuery
|
|
{ _rdeqEventId :: EventId,
|
|
_rdeqSource :: SourceName
|
|
}
|
|
|
|
instance FromJSON (RedeliverEventQuery b) where
|
|
parseJSON = withObject "RedeliverEventQuery" $ \o ->
|
|
RedeliverEventQuery
|
|
<$> o .: "event_id"
|
|
<*> o .:? "source" .!= defaultSource
|
|
|
|
data InvokeEventTriggerQuery (b :: BackendType) = InvokeEventTriggerQuery
|
|
{ _ietqName :: TriggerName,
|
|
_ietqSource :: SourceName,
|
|
_ietqPayload :: Value
|
|
}
|
|
|
|
instance Backend b => FromJSON (InvokeEventTriggerQuery b) where
|
|
parseJSON = withObject "InvokeEventTriggerQuery" $ \o ->
|
|
InvokeEventTriggerQuery
|
|
<$> o .: "name"
|
|
<*> o .:? "source" .!= defaultSource
|
|
<*> o .: "payload"
|
|
|
|
-- | This typeclass have the implementation logic for the event trigger log cleanup
|
|
class Monad m => MonadEventLogCleanup m where
|
|
-- Deletes the logs of event triggers
|
|
runLogCleaner ::
|
|
TriggerLogCleanupConfig -> m (Either QErr EncJSON)
|
|
|
|
-- Generates the cleanup schedules for event triggers which have log cleaners installed
|
|
generateCleanupSchedules ::
|
|
AB.AnyBackend SourceInfo -> TriggerName -> AutoTriggerLogCleanupConfig -> m (Either QErr ())
|
|
|
|
instance (MonadEventLogCleanup m) => MonadEventLogCleanup (ReaderT r m) where
|
|
runLogCleaner conf = lift $ runLogCleaner conf
|
|
generateCleanupSchedules sourceInfo triggerName cleanupConfig = lift $ generateCleanupSchedules sourceInfo triggerName cleanupConfig
|
|
|
|
instance (MonadEventLogCleanup m) => MonadEventLogCleanup (MetadataT m) where
|
|
runLogCleaner conf = lift $ runLogCleaner conf
|
|
generateCleanupSchedules sourceInfo triggerName cleanupConfig = lift $ generateCleanupSchedules sourceInfo triggerName cleanupConfig
|
|
|
|
instance (MonadEventLogCleanup m) => MonadEventLogCleanup (MetadataStorageT m) where
|
|
runLogCleaner conf = lift $ runLogCleaner conf
|
|
generateCleanupSchedules sourceInfo triggerName cleanupConfig = lift $ generateCleanupSchedules sourceInfo triggerName cleanupConfig
|
|
|
|
instance (MonadEventLogCleanup m) => MonadEventLogCleanup (TraceT m) where
|
|
runLogCleaner conf = lift $ runLogCleaner conf
|
|
generateCleanupSchedules sourceInfo triggerName cleanupConfig = lift $ generateCleanupSchedules sourceInfo triggerName cleanupConfig
|
|
|
|
resolveEventTriggerQuery ::
|
|
forall b m.
|
|
(Backend b, UserInfoM m, QErrM m, CacheRM m) =>
|
|
CreateEventTriggerQuery b ->
|
|
m (Bool, EventTriggerConf b)
|
|
resolveEventTriggerQuery (CreateEventTriggerQuery source name qt insert update delete enableManual retryConf webhook webhookFromEnv mheaders replace reqTransform respTransform cleanupConfig triggerOnReplication) = do
|
|
ti <- askTableCoreInfo source qt
|
|
-- can only replace for same table
|
|
when replace $ do
|
|
ti' <- _tiCoreInfo <$> askTabInfoFromTrigger @b source name
|
|
when (_tciName ti' /= _tciName ti) $ throw400 NotSupported "cannot replace table or schema for trigger"
|
|
|
|
assertCols ti insert
|
|
assertCols ti update
|
|
assertCols ti delete
|
|
|
|
let rconf = fromMaybe defaultRetryConf retryConf
|
|
return (replace, EventTriggerConf name (TriggerOpsDef insert update delete enableManual) webhook webhookFromEnv rconf mheaders reqTransform respTransform cleanupConfig triggerOnReplication)
|
|
where
|
|
assertCols :: TableCoreInfo b -> Maybe (SubscribeOpSpec b) -> m ()
|
|
assertCols ti opSpec = for_ opSpec \sos -> case sosColumns sos of
|
|
SubCStar -> return ()
|
|
SubCArray columns -> forM_ columns (assertColumnExists @b (_tciFieldInfoMap ti) "")
|
|
|
|
droppedTriggerOps :: TriggerOpsDef b -> TriggerOpsDef b -> HashSet Ops
|
|
droppedTriggerOps oldEventTriggerOps newEventTriggerOps =
|
|
Set.fromList $
|
|
catMaybes $
|
|
[ (bool Nothing (Just INSERT) (isDroppedOp (tdInsert oldEventTriggerOps) (tdInsert newEventTriggerOps))),
|
|
(bool Nothing (Just UPDATE) (isDroppedOp (tdUpdate oldEventTriggerOps) (tdUpdate newEventTriggerOps))),
|
|
(bool Nothing (Just DELETE) (isDroppedOp (tdDelete oldEventTriggerOps) (tdDelete newEventTriggerOps)))
|
|
]
|
|
where
|
|
isDroppedOp old new = isJust old && isNothing new
|
|
|
|
createEventTriggerQueryMetadata ::
|
|
forall b m r.
|
|
( BackendMetadata b,
|
|
QErrM m,
|
|
UserInfoM m,
|
|
CacheRWM m,
|
|
MetadataM m,
|
|
BackendEventTrigger b,
|
|
MonadIO m,
|
|
MonadEventLogCleanup m,
|
|
MonadReader r m,
|
|
Has (L.Logger L.Hasura) r
|
|
) =>
|
|
CreateEventTriggerQuery b ->
|
|
m ()
|
|
createEventTriggerQueryMetadata q = do
|
|
(replace, triggerConf) <- resolveEventTriggerQuery q
|
|
let table = _cetqTable q
|
|
source = _cetqSource q
|
|
triggerName = etcName triggerConf
|
|
metadataObj =
|
|
MOSourceObjId source $
|
|
AB.mkAnyBackend $
|
|
SMOTableObj @b table $
|
|
MTOTrigger triggerName
|
|
sourceInfo <- askSourceInfo @b source
|
|
let sourceConfig = (_siConfiguration sourceInfo)
|
|
newConfig = _cteqCleanupConfig q
|
|
|
|
-- Check for existence of a trigger with 'triggerName' only when 'replace' is not set
|
|
if replace
|
|
then do
|
|
existingEventTriggerOps <- etiOpsDef <$> askEventTriggerInfo @b source triggerName
|
|
let droppedOps = droppedTriggerOps existingEventTriggerOps (etcDefinition triggerConf)
|
|
dropDanglingSQLTrigger @b (_siConfiguration sourceInfo) triggerName table droppedOps
|
|
|
|
-- check if cron schedule for the cleanup config has changed then delete the scheduled cleanups
|
|
oldConfig <- etiCleanupConfig <$> askEventTriggerInfo @b source triggerName
|
|
when (hasCleanupCronScheduleUpdated oldConfig newConfig) do
|
|
deleteAllScheduledCleanups @b sourceConfig triggerName
|
|
for_ newConfig \cleanupConfig -> do
|
|
(`onLeft` logQErr) =<< generateCleanupSchedules (AB.mkAnyBackend sourceInfo) triggerName cleanupConfig
|
|
else do
|
|
doesTriggerExists <- checkIfTriggerExists @b sourceConfig triggerName (Set.fromList [INSERT, UPDATE, DELETE])
|
|
if doesTriggerExists
|
|
then throw400 AlreadyExists ("Event trigger with name " <> triggerNameToTxt triggerName <<> " already exists")
|
|
else for_ newConfig \cleanupConfig -> do
|
|
(`onLeft` logQErr) =<< generateCleanupSchedules (AB.mkAnyBackend sourceInfo) triggerName cleanupConfig
|
|
|
|
buildSchemaCacheFor metadataObj $
|
|
MetadataModifier $
|
|
tableMetadataSetter @b source table . tmEventTriggers
|
|
%~ if replace
|
|
then ix triggerName .~ triggerConf
|
|
else OMap.insert triggerName triggerConf
|
|
|
|
runCreateEventTriggerQuery ::
|
|
forall b m r.
|
|
( BackendMetadata b,
|
|
BackendEventTrigger b,
|
|
QErrM m,
|
|
UserInfoM m,
|
|
CacheRWM m,
|
|
MetadataM m,
|
|
MonadIO m,
|
|
MonadEventLogCleanup m,
|
|
MonadReader r m,
|
|
Has (L.Logger L.Hasura) r
|
|
) =>
|
|
CreateEventTriggerQuery b ->
|
|
m EncJSON
|
|
runCreateEventTriggerQuery q = do
|
|
createEventTriggerQueryMetadata @b q
|
|
pure successMsg
|
|
|
|
runDeleteEventTriggerQuery ::
|
|
forall b m.
|
|
(BackendEventTrigger b, MonadError QErr m, CacheRWM m, MonadIO m, MetadataM m) =>
|
|
DeleteEventTriggerQuery b ->
|
|
m EncJSON
|
|
runDeleteEventTriggerQuery (DeleteEventTriggerQuery sourceName triggerName) = do
|
|
sourceConfig <- askSourceConfig @b sourceName
|
|
tableName <- (_tciName . _tiCoreInfo) <$> askTabInfoFromTrigger @b sourceName triggerName
|
|
|
|
withNewInconsistentObjsCheck $
|
|
buildSchemaCache $
|
|
MetadataModifier $
|
|
tableMetadataSetter @b sourceName tableName %~ dropEventTriggerInMetadata triggerName
|
|
|
|
dropTriggerAndArchiveEvents @b sourceConfig triggerName tableName
|
|
|
|
deleteAllScheduledCleanups @b sourceConfig triggerName
|
|
|
|
pure successMsg
|
|
|
|
runRedeliverEvent ::
|
|
forall b m.
|
|
(BackendEventTrigger b, MonadIO m, CacheRM m, QErrM m, MetadataM m) =>
|
|
RedeliverEventQuery b ->
|
|
m EncJSON
|
|
runRedeliverEvent (RedeliverEventQuery eventId source) = do
|
|
sourceConfig <- askSourceConfig @b source
|
|
redeliverEvent @b sourceConfig eventId
|
|
pure successMsg
|
|
|
|
runInvokeEventTrigger ::
|
|
forall b m.
|
|
( MonadIO m,
|
|
QErrM m,
|
|
CacheRM m,
|
|
MetadataM m,
|
|
Tracing.MonadTrace m,
|
|
UserInfoM m,
|
|
BackendEventTrigger b
|
|
) =>
|
|
InvokeEventTriggerQuery b ->
|
|
m EncJSON
|
|
runInvokeEventTrigger (InvokeEventTriggerQuery name source payload) = do
|
|
trigInfo <- askEventTriggerInfo @b source name
|
|
assertManual $ etiOpsDef trigInfo
|
|
ti <- askTabInfoFromTrigger source name
|
|
sourceConfig <- askSourceConfig @b source
|
|
traceCtx <- Tracing.currentContext
|
|
userInfo <- askUserInfo
|
|
eid <- insertManualEvent @b sourceConfig (tableInfoName @b ti) name (makePayload payload) userInfo traceCtx
|
|
return $ encJFromJValue $ object ["event_id" .= eid]
|
|
where
|
|
makePayload o = object ["old" .= Null, "new" .= o]
|
|
|
|
assertManual (TriggerOpsDef _ _ _ man) = case man of
|
|
Just True -> return ()
|
|
_ -> throw400 NotSupported "manual mode is not enabled for event trigger"
|
|
|
|
askTabInfoFromTrigger ::
|
|
(Backend b, QErrM m, CacheRM m) =>
|
|
SourceName ->
|
|
TriggerName ->
|
|
m (TableInfo b)
|
|
askTabInfoFromTrigger sourceName triggerName = do
|
|
schemaCache <- askSchemaCache
|
|
tableInfoMaybe <- getTabInfoFromSchemaCache schemaCache sourceName triggerName
|
|
tableInfoMaybe `onNothing` throw400 NotExists errMsg
|
|
where
|
|
errMsg = "event trigger " <> triggerName <<> " does not exist"
|
|
|
|
getTabInfoFromSchemaCache ::
|
|
(Backend b, QErrM m) =>
|
|
SchemaCache ->
|
|
SourceName ->
|
|
TriggerName ->
|
|
m (Maybe (TableInfo b))
|
|
getTabInfoFromSchemaCache schemaCache sourceName triggerName = do
|
|
let tabInfos = HM.elems $ fromMaybe mempty $ unsafeTableCache sourceName $ scSources schemaCache
|
|
pure $ find (isJust . HM.lookup triggerName . _tiEventTriggerInfoMap) tabInfos
|
|
|
|
askEventTriggerInfo ::
|
|
forall b m.
|
|
(QErrM m, CacheRM m, Backend b) =>
|
|
SourceName ->
|
|
TriggerName ->
|
|
m (EventTriggerInfo b)
|
|
askEventTriggerInfo sourceName triggerName = do
|
|
triggerInfo <- askTabInfoFromTrigger @b sourceName triggerName
|
|
let eventTriggerInfoMap = _tiEventTriggerInfoMap triggerInfo
|
|
HM.lookup triggerName eventTriggerInfoMap `onNothing` throw400 NotExists errMsg
|
|
where
|
|
errMsg = "event trigger " <> triggerName <<> " does not exist"
|
|
|
|
-- This change helps us create functions for the event triggers
|
|
-- without the function name being truncated by PG, since PG allows
|
|
-- for only 63 chars for identifiers.
|
|
-- Reasoning for the 42 characters:
|
|
-- 63 - (notify_hasura_) - (_INSERT | _UPDATE | _DELETE)
|
|
maxTriggerNameLength :: Int
|
|
maxTriggerNameLength = 42
|
|
|
|
getHeaderInfosFromConf ::
|
|
QErrM m =>
|
|
Env.Environment ->
|
|
[HeaderConf] ->
|
|
m [EventHeaderInfo]
|
|
getHeaderInfosFromConf env = mapM getHeader
|
|
where
|
|
getHeader :: QErrM m => HeaderConf -> m EventHeaderInfo
|
|
getHeader hconf = case hconf of
|
|
(HeaderConf _ (HVValue val)) -> return $ EventHeaderInfo hconf val
|
|
(HeaderConf _ (HVEnv val)) -> do
|
|
envVal <- getEnv env val
|
|
return $ EventHeaderInfo hconf envVal
|
|
|
|
getWebhookInfoFromConf ::
|
|
QErrM m =>
|
|
Env.Environment ->
|
|
WebhookConf ->
|
|
m WebhookConfInfo
|
|
getWebhookInfoFromConf env webhookConf = case webhookConf of
|
|
WCValue w -> do
|
|
resolvedWebhook <- resolveWebhook env w
|
|
let urlTemplate = printURLTemplate $ unInputWebhook w
|
|
-- `urlTemplate` can either be the template value({{TEST}}) or a plain text.
|
|
-- When `urlTemplate` is a template value then '_envVarName' of the 'EnvRecord'
|
|
-- will be the template value i.e '{{TEST}}'
|
|
-- When `urlTemplate` is a plain text then '_envVarName' of the 'EnvRecord' will be the plain text value.
|
|
return $ WebhookConfInfo webhookConf (EnvRecord urlTemplate resolvedWebhook)
|
|
WCEnv webhookEnvVar -> do
|
|
envVal <- getEnv env webhookEnvVar
|
|
return $ WebhookConfInfo webhookConf (EnvRecord webhookEnvVar (ResolvedWebhook envVal))
|
|
|
|
buildEventTriggerInfo ::
|
|
forall b m.
|
|
(Backend b, QErrM m) =>
|
|
Env.Environment ->
|
|
SourceName ->
|
|
TableName b ->
|
|
EventTriggerConf b ->
|
|
m (EventTriggerInfo b, Seq SchemaDependency)
|
|
buildEventTriggerInfo
|
|
env
|
|
source
|
|
tableName
|
|
( EventTriggerConf
|
|
name
|
|
def
|
|
webhook
|
|
webhookFromEnv
|
|
rconf
|
|
mheaders
|
|
reqTransform
|
|
respTransform
|
|
cleanupConfig
|
|
triggerOnReplication
|
|
) = do
|
|
webhookConf <- case (webhook, webhookFromEnv) of
|
|
(Just w, Nothing) -> return $ WCValue w
|
|
(Nothing, Just wEnv) -> return $ WCEnv wEnv
|
|
_ -> throw500 "expected webhook or webhook_from_env"
|
|
let headerConfs = fromMaybe [] mheaders
|
|
webhookInfo <- getWebhookInfoFromConf env webhookConf
|
|
headerInfos <- getHeaderInfosFromConf env headerConfs
|
|
let eTrigInfo =
|
|
EventTriggerInfo
|
|
name
|
|
def
|
|
rconf
|
|
webhookInfo
|
|
headerInfos
|
|
reqTransform
|
|
respTransform
|
|
cleanupConfig
|
|
triggerOnReplication
|
|
tabDep =
|
|
SchemaDependency
|
|
( SOSourceObj source $
|
|
AB.mkAnyBackend $
|
|
SOITable @b tableName
|
|
)
|
|
DRParent
|
|
pure (eTrigInfo, tabDep Seq.:<| getTrigDefDeps @b source tableName def)
|
|
|
|
getTrigDefDeps ::
|
|
forall b.
|
|
Backend b =>
|
|
SourceName ->
|
|
TableName b ->
|
|
TriggerOpsDef b ->
|
|
Seq SchemaDependency
|
|
getTrigDefDeps source tableName (TriggerOpsDef mIns mUpd mDel _) =
|
|
mconcat $
|
|
Seq.fromList
|
|
<$> catMaybes
|
|
[ subsOpSpecDeps <$> mIns,
|
|
subsOpSpecDeps <$> mUpd,
|
|
subsOpSpecDeps <$> mDel
|
|
]
|
|
where
|
|
subsOpSpecDeps :: SubscribeOpSpec b -> [SchemaDependency]
|
|
subsOpSpecDeps os =
|
|
let cols = getColsFromSub $ sosColumns os
|
|
mkColDependency dependencyReason col =
|
|
SchemaDependency
|
|
( SOSourceObj source $
|
|
AB.mkAnyBackend $
|
|
SOITableObj @b tableName (TOCol @b col)
|
|
)
|
|
dependencyReason
|
|
colDeps = map (mkColDependency DRColumn) cols
|
|
payload = maybe [] getColsFromSub (sosPayload os)
|
|
payloadDeps = map (mkColDependency DRPayload) payload
|
|
in colDeps <> payloadDeps
|
|
getColsFromSub sc = case sc of
|
|
SubCStar -> []
|
|
SubCArray cols -> cols
|
|
|
|
getTriggersMap ::
|
|
SourceMetadata b ->
|
|
InsOrdHashMap TriggerName (EventTriggerConf b)
|
|
getTriggersMap = OMap.unions . map _tmEventTriggers . OMap.elems . _smTables
|
|
|
|
getTriggerNames ::
|
|
SourceMetadata b ->
|
|
Set.HashSet TriggerName
|
|
getTriggerNames = Set.fromList . OMap.keys . getTriggersMap
|
|
|
|
getTableNameFromTrigger ::
|
|
forall b m.
|
|
(Backend b, QErrM m) =>
|
|
SchemaCache ->
|
|
SourceName ->
|
|
TriggerName ->
|
|
m (Maybe (TableName b))
|
|
getTableNameFromTrigger schemaCache sourceName triggerName = do
|
|
tableInfoMaybe <- getTabInfoFromSchemaCache @b schemaCache sourceName triggerName
|
|
case tableInfoMaybe of
|
|
Nothing -> pure Nothing
|
|
Just tableInfo -> pure $ Just $ (_tciName . _tiCoreInfo) $ tableInfo
|
|
|
|
runCleanupEventTriggerLog ::
|
|
(MonadEventLogCleanup m, MonadError QErr m) =>
|
|
TriggerLogCleanupConfig ->
|
|
m EncJSON
|
|
runCleanupEventTriggerLog conf = runLogCleaner conf >>= (flip onLeft) throwError
|
|
|
|
-- | Updates the cleanup switch in metadata given the source, table and trigger name
|
|
-- The Bool value represents the status of the cleaner, whether to start or pause it
|
|
updateCleanupStatusInMetadata ::
|
|
forall b m.
|
|
(Backend b, QErrM m, CacheRWM m, MetadataM m) =>
|
|
AutoTriggerLogCleanupConfig ->
|
|
EventTriggerCleanupStatus ->
|
|
SourceName ->
|
|
TableName b ->
|
|
TriggerName ->
|
|
m ()
|
|
updateCleanupStatusInMetadata cleanupConfig cleanupSwitch sourceName tableName triggerName = do
|
|
let newCleanupConfig = Just $ cleanupConfig {_atlccPaused = cleanupSwitch}
|
|
metadataObj =
|
|
MOSourceObjId sourceName $
|
|
AB.mkAnyBackend $
|
|
SMOTableObj @b tableName $
|
|
MTOTrigger triggerName
|
|
|
|
buildSchemaCacheFor metadataObj $
|
|
MetadataModifier $
|
|
tableMetadataSetter @b sourceName tableName . tmEventTriggers . ix triggerName %~ updateCleanupConfig newCleanupConfig
|
|
|
|
-- | Function to start/stop the cleanup action based on the event triggers supplied in
|
|
-- TriggerLogCleanupToggleConfig conf
|
|
toggleEventTriggerCleanupAction ::
|
|
forall m.
|
|
(MonadIO m, QErrM m, CacheRWM m, MetadataM m) =>
|
|
TriggerLogCleanupToggleConfig ->
|
|
EventTriggerCleanupStatus ->
|
|
m EncJSON
|
|
toggleEventTriggerCleanupAction conf cleanupSwitch = do
|
|
schemaCache <- askSchemaCache
|
|
case conf of
|
|
TriggerLogCleanupSources tlcs -> do
|
|
case tlcs of
|
|
TriggerAllSource -> do
|
|
ifor_ (scSources schemaCache) $ \sourceName backendSourceInfo -> do
|
|
AB.dispatchAnyBackend @BackendEventTrigger backendSourceInfo \(SourceInfo _ tableCache _ _ _ _ :: SourceInfo b) -> do
|
|
traverseTableHelper tableCache cleanupSwitch sourceName
|
|
TriggerSource sourceNameLst -> do
|
|
forM_ sourceNameLst $ \sourceName -> do
|
|
backendSourceInfo <-
|
|
HM.lookup sourceName (scSources schemaCache)
|
|
`onNothing` throw400 NotExists ("source with name " <> sourceNameToText sourceName <> " does not exists")
|
|
|
|
AB.dispatchAnyBackend @BackendEventTrigger backendSourceInfo \(SourceInfo _ tableCache _ _ _ _ :: SourceInfo b) -> do
|
|
traverseTableHelper tableCache cleanupSwitch sourceName
|
|
TriggerQualifier qualifierLst -> do
|
|
forM_ qualifierLst $ \qualifier -> do
|
|
let sourceName = _etqSourceName qualifier
|
|
triggerNames = _etqEventTriggers qualifier
|
|
|
|
backendSourceInfo <-
|
|
HM.lookup sourceName (scSources schemaCache)
|
|
`onNothing` throw400 NotExists ("source with name " <> sourceNameToText sourceName <> " does not exists")
|
|
|
|
AB.dispatchAnyBackend @BackendEventTrigger backendSourceInfo \(SourceInfo {} :: SourceInfo b) -> do
|
|
forM_ triggerNames $ \triggerName -> do
|
|
eventTriggerInfo <- askEventTriggerInfo @b sourceName triggerName
|
|
tableNameMaybe <- getTableNameFromTrigger @b schemaCache sourceName triggerName
|
|
case tableNameMaybe of
|
|
Nothing -> throw400 NotExists $ "event trigger " <> triggerName <<> " does not exist"
|
|
Just tableName -> do
|
|
cleanupConfig <-
|
|
(etiCleanupConfig eventTriggerInfo)
|
|
`onNothing` throw400 NotExists ("cleanup config does not exist for " <> triggerNameToTxt triggerName)
|
|
updateCleanupStatusInMetadata @b cleanupConfig cleanupSwitch sourceName tableName triggerName
|
|
pure successMsg
|
|
where
|
|
traverseTableHelper ::
|
|
forall b.
|
|
(Backend b) =>
|
|
TableCache b ->
|
|
EventTriggerCleanupStatus ->
|
|
SourceName ->
|
|
m ()
|
|
traverseTableHelper tableCache switch sourceName = forM_ tableCache $ \tableInfo -> do
|
|
let tableName = (_tciName . _tiCoreInfo) tableInfo
|
|
eventTriggerInfoMap = _tiEventTriggerInfoMap tableInfo
|
|
ifor_ eventTriggerInfoMap $ \triggerName eventTriggerInfo -> do
|
|
for_ (etiCleanupConfig eventTriggerInfo) $ \cleanupConfig ->
|
|
updateCleanupStatusInMetadata @b cleanupConfig switch sourceName tableName triggerName
|
|
|
|
runEventTriggerResumeCleanup ::
|
|
forall m.
|
|
(MonadIO m, QErrM m, CacheRWM m, MetadataM m) =>
|
|
TriggerLogCleanupToggleConfig ->
|
|
m EncJSON
|
|
runEventTriggerResumeCleanup conf = toggleEventTriggerCleanupAction conf ETCSUnpaused
|
|
|
|
runEventTriggerPauseCleanup ::
|
|
(MonadError QErr m, CacheRWM m, MonadIO m, MetadataM m) =>
|
|
TriggerLogCleanupToggleConfig ->
|
|
m EncJSON
|
|
runEventTriggerPauseCleanup conf = toggleEventTriggerCleanupAction conf ETCSPaused
|
|
|
|
-- | Collects and returns all the event triggers with cleanup config
|
|
getAllEventTriggersWithCleanupConfig :: TableInfo b -> [(TriggerName, AutoTriggerLogCleanupConfig)]
|
|
getAllEventTriggersWithCleanupConfig tInfo = mapMaybe (\(triggerName, triggerInfo) -> (triggerName,) <$> etiCleanupConfig triggerInfo) $ Map.toList $ _tiEventTriggerInfoMap tInfo
|
|
|
|
hasCleanupCronScheduleUpdated :: Maybe AutoTriggerLogCleanupConfig -> Maybe AutoTriggerLogCleanupConfig -> Bool
|
|
hasCleanupCronScheduleUpdated Nothing _ = False
|
|
hasCleanupCronScheduleUpdated _ Nothing = True
|
|
hasCleanupCronScheduleUpdated (Just oldConfig) (Just newConfig) =
|
|
_atlccSchedule oldConfig /= _atlccSchedule newConfig
|
|
|
|
getAllETWithCleanupConfigInTableMetadata :: TableMetadata b -> [(TriggerName, AutoTriggerLogCleanupConfig)]
|
|
getAllETWithCleanupConfigInTableMetadata tMetadata =
|
|
mapMaybe
|
|
( \(triggerName, triggerConf) ->
|
|
(triggerName,)
|
|
<$> etcCleanupConfig triggerConf
|
|
)
|
|
$ OMap.toList
|
|
$ _tmEventTriggers tMetadata
|