mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-14 17:02:49 +03:00
server, pro: event trigger auto cleanup (increment 1)
PR-URL: https://github.com/hasura/graphql-engine-mono/pull/5612 Co-authored-by: pranshi06 <85474619+pranshi06@users.noreply.github.com> Co-authored-by: Puru Gupta <32328846+purugupta99@users.noreply.github.com> Co-authored-by: Karthikeyan Chinnakonda <15602904+codingkarthik@users.noreply.github.com> GitOrigin-RevId: 6ce69ebb555e49439ae2b01fe42e39415ac53966
This commit is contained in:
parent
2654a1b4f4
commit
d6970173c1
@ -99,6 +99,7 @@ import Hasura.Logging
|
||||
import Hasura.Metadata.Class
|
||||
import Hasura.Prelude
|
||||
import Hasura.QueryTags
|
||||
import Hasura.RQL.DDL.EventTrigger (MonadEventLogCleanup (runLogCleaner))
|
||||
import Hasura.RQL.DDL.Schema.Cache
|
||||
import Hasura.RQL.DDL.Schema.Cache.Common
|
||||
import Hasura.RQL.DDL.Schema.Catalog
|
||||
@ -557,7 +558,8 @@ runHGEServer ::
|
||||
HasResourceLimits m,
|
||||
MonadMetadataStorage (MetadataStorageT m),
|
||||
MonadResolveSource m,
|
||||
EB.MonadQueryTags m
|
||||
EB.MonadQueryTags m,
|
||||
MonadEventLogCleanup m
|
||||
) =>
|
||||
(ServerCtx -> Spock.SpockT m ()) ->
|
||||
Env.Environment ->
|
||||
@ -644,7 +646,8 @@ mkHGEServer ::
|
||||
HasResourceLimits m,
|
||||
MonadMetadataStorage (MetadataStorageT m),
|
||||
MonadResolveSource m,
|
||||
EB.MonadQueryTags m
|
||||
EB.MonadQueryTags m,
|
||||
MonadEventLogCleanup m
|
||||
) =>
|
||||
(ServerCtx -> Spock.SpockT m ()) ->
|
||||
Env.Environment ->
|
||||
@ -1087,6 +1090,11 @@ instance (Monad m) => MonadResolveSource (PGMetadataStorageAppT m) where
|
||||
instance (Monad m) => EB.MonadQueryTags (PGMetadataStorageAppT m) where
|
||||
createQueryTags _attributes _qtSourceConfig = return $ emptyQueryTagsComment
|
||||
|
||||
instance (Monad m) => MonadEventLogCleanup (PGMetadataStorageAppT m) where
|
||||
runLogCleaner _ = pure err
|
||||
where
|
||||
err = throw400 NotSupported "Event log cleanup feature is enterprise edition only"
|
||||
|
||||
runInSeparateTx ::
|
||||
(MonadIO m) =>
|
||||
Q.TxE QErr a ->
|
||||
|
@ -18,6 +18,7 @@ module Hasura.Backends.MSSQL.DDL.EventTrigger
|
||||
qualifyTableName,
|
||||
createMissingSQLTriggers,
|
||||
checkIfTriggerExists,
|
||||
deleteEventTriggerLogs,
|
||||
)
|
||||
where
|
||||
|
||||
@ -846,3 +847,67 @@ mkUpdateTriggerQuery
|
||||
listenColumnExp = unSQLFragment $ mkListenColumnsExp "INSERTED" "DELETED" listenColumns
|
||||
isPrimaryKeyInListenColumnsExp = unSQLFragment $ isPrimaryKeyInListenColumns listenColumns primaryKey
|
||||
in $(makeRelativeToProject "src-rsr/mssql/mssql_update_trigger.sql.shakespeare" >>= ST.stextFile)
|
||||
|
||||
deleteEventTriggerLogsTx :: TriggerLogCleanupConfig -> TxE QErr DeletedEventLogStats
|
||||
deleteEventTriggerLogsTx TriggerLogCleanupConfig {..} = do
|
||||
-- Setting the timeout
|
||||
unitQueryE
|
||||
HGE.defaultMSSQLTxErrorHandler
|
||||
[ODBC.sql|
|
||||
SET LOCK_TIMEOUT $qTimeout;
|
||||
|]
|
||||
-- Select all the dead events based on criteria set in the cleanup config.
|
||||
deadEventIDs :: [EventId] <-
|
||||
map EventId
|
||||
<$> multiRowQueryE
|
||||
HGE.defaultMSSQLTxErrorHandler
|
||||
[ODBC.sql|
|
||||
SELECT TOP ($qBatchSize) CAST(id AS nvarchar(36)) FROM hdb_catalog.event_log WITH (UPDLOCK, READPAST)
|
||||
WHERE ((delivered = 1 OR error = 1) AND trigger_name = $qTriggerName )
|
||||
AND created_at < DATEADD(HOUR, - $qRetentionPeriod, CURRENT_TIMESTAMP)
|
||||
AND locked IS NULL
|
||||
|]
|
||||
let generateValuesFromEvents :: [EventId] -> Text --
|
||||
-- creates a list of event id's (('123-abc'), ('456-vgh'), ('234-asd'))
|
||||
generateValuesFromEvents events = commaSeparated values
|
||||
where
|
||||
values = map (\e -> "('" <> toTxt e <> "')") events
|
||||
eventIdsValues = generateValuesFromEvents deadEventIDs
|
||||
-- Lock the events in the database so that other HGE instances don't pick them up for deletion.
|
||||
unitQueryE HGE.defaultMSSQLTxErrorHandler $
|
||||
rawUnescapedText . LT.toStrict $
|
||||
$(makeRelativeToProject "src-rsr/mssql/event_logs_cleanup_sqls/mssql_lock_events.sql.shakespeare" >>= ST.stextFile)
|
||||
-- Based on the config either delete the corresponding invocation logs or set event_id = NULL
|
||||
-- (We set event_id to null as we cannot delete the event logs with corresponding invocation logs
|
||||
-- due to the foreign key constraint)
|
||||
deletedInvocationLogs :: [Int] <- -- This will be an array of 1 and is only used to count the number of deleted rows.
|
||||
multiRowQueryE HGE.defaultMSSQLTxErrorHandler $
|
||||
rawUnescapedText . LT.toStrict $
|
||||
if tlccCleanInvocationLogs
|
||||
then $(makeRelativeToProject "src-rsr/mssql/event_logs_cleanup_sqls/mssql_delete_event_invocations.sql.shakespeare" >>= ST.stextFile)
|
||||
else $(makeRelativeToProject "src-rsr/mssql/event_logs_cleanup_sqls/mssql_null_event_invocations.sql.shakespeare" >>= ST.stextFile)
|
||||
-- Finally delete the event logs.
|
||||
deletedEventLogs :: [Int] <- -- This will be an array of 1 and is only used to count the number of deleted rows.
|
||||
multiRowQueryE HGE.defaultMSSQLTxErrorHandler $
|
||||
rawUnescapedText . LT.toStrict $
|
||||
$(makeRelativeToProject "src-rsr/mssql/event_logs_cleanup_sqls/mssql_delete_event.sql.shakespeare" >>= ST.stextFile)
|
||||
-- Removing the timeout (-1 is the default timeout)
|
||||
unitQueryE
|
||||
HGE.defaultMSSQLTxErrorHandler
|
||||
[ODBC.sql|
|
||||
SET LOCK_TIMEOUT -1;
|
||||
|]
|
||||
pure $ DeletedEventLogStats (length deletedEventLogs) (length deletedInvocationLogs)
|
||||
where
|
||||
qTimeout = tlccQueryTimeout * 1000
|
||||
qTriggerName = triggerNameToTxt tlccEventTriggerName
|
||||
qRetentionPeriod = tlccRetentionPeriod
|
||||
qBatchSize = tlccBatchSize
|
||||
|
||||
deleteEventTriggerLogs ::
|
||||
(MonadIO m) =>
|
||||
MSSQLSourceConfig ->
|
||||
TriggerLogCleanupConfig ->
|
||||
m (Either QErr DeletedEventLogStats)
|
||||
deleteEventTriggerLogs sourceConfig cleanupConfig =
|
||||
liftIO $ runMSSQLSourceWriteTx sourceConfig $ deleteEventTriggerLogsTx cleanupConfig
|
||||
|
@ -24,6 +24,7 @@ module Hasura.Backends.Postgres.DDL.EventTrigger
|
||||
unlockEventsInSource,
|
||||
updateColumnInEventTrigger,
|
||||
checkIfTriggerExists,
|
||||
deleteEventTriggerLogs,
|
||||
)
|
||||
where
|
||||
|
||||
@ -811,3 +812,99 @@ mkAllTriggersQ triggerName table allCols fullspec = do
|
||||
onJust (tdInsert fullspec) (mkTrigger triggerName table allCols INSERT)
|
||||
onJust (tdUpdate fullspec) (mkTrigger triggerName table allCols UPDATE)
|
||||
onJust (tdDelete fullspec) (mkTrigger triggerName table allCols DELETE)
|
||||
|
||||
deleteEventTriggerLogsTx :: TriggerLogCleanupConfig -> Q.TxE QErr DeletedEventLogStats
|
||||
deleteEventTriggerLogsTx TriggerLogCleanupConfig {..} = do
|
||||
-- Setting the timeout
|
||||
Q.unitQE defaultTxErrorHandler (Q.fromText $ "SET statement_timeout = " <> (tshow qTimeout)) () True
|
||||
-- Select all the dead events based on criteria set in the cleanup config.
|
||||
deadEventIDs <-
|
||||
map runIdentity
|
||||
<$> Q.listQE
|
||||
defaultTxErrorHandler
|
||||
[Q.sql|
|
||||
SELECT id FROM hdb_catalog.event_log
|
||||
WHERE ((delivered = true OR error = true) AND trigger_name = $1)
|
||||
AND created_at < now() - interval '$2'
|
||||
AND locked IS NULL
|
||||
LIMIT $3
|
||||
|]
|
||||
(qTriggerName, qRetentionPeriod, qBatchSize)
|
||||
True
|
||||
-- Lock the events in the database so that other HGE instances don't pick them up for deletion.
|
||||
Q.unitQE
|
||||
defaultTxErrorHandler
|
||||
[Q.sql|
|
||||
UPDATE hdb_catalog.event_log
|
||||
SET locked = now()
|
||||
WHERE id = ANY($1::text[]);
|
||||
|]
|
||||
(Identity $ PGTextArray $ map unEventId deadEventIDs)
|
||||
True
|
||||
-- Based on the config either delete the corresponding invocation logs or set event_id = NULL
|
||||
-- (We set event_id to null as we cannot delete the event logs with corresponding invocation logs
|
||||
-- due to the foreign key constraint)
|
||||
deletedInvocationLogs <-
|
||||
if tlccCleanInvocationLogs
|
||||
then
|
||||
runIdentity . Q.getRow
|
||||
<$> Q.withQE
|
||||
defaultTxErrorHandler
|
||||
[Q.sql|
|
||||
WITH deletedInvocations AS (
|
||||
DELETE FROM hdb_catalog.event_invocation_logs
|
||||
WHERE event_id = ANY($1::text[])
|
||||
RETURNING 1
|
||||
)
|
||||
SELECT count(*) FROM deletedInvocations;
|
||||
|]
|
||||
(Identity $ PGTextArray $ map unEventId deadEventIDs)
|
||||
True
|
||||
else do
|
||||
Q.unitQE
|
||||
defaultTxErrorHandler
|
||||
[Q.sql|
|
||||
UPDATE hdb_catalog.event_invocation_logs
|
||||
SET event_id = NULL
|
||||
WHERE event_id = ANY($1::text[])
|
||||
|]
|
||||
(Identity $ PGTextArray $ map unEventId deadEventIDs)
|
||||
True
|
||||
pure 0
|
||||
-- Finally delete the event logs.
|
||||
deletedEventLogs <-
|
||||
runIdentity . Q.getRow
|
||||
<$> Q.withQE
|
||||
defaultTxErrorHandler
|
||||
[Q.sql|
|
||||
WITH deletedEvents AS (
|
||||
DELETE FROM hdb_catalog.event_log
|
||||
WHERE id = ANY($1::text[])
|
||||
RETURNING 1
|
||||
)
|
||||
SELECT count(*) FROM deletedEvents;
|
||||
|]
|
||||
(Identity $ PGTextArray $ map unEventId deadEventIDs)
|
||||
True
|
||||
-- Resetting the timeout to default value (0)
|
||||
Q.unitQE
|
||||
defaultTxErrorHandler
|
||||
[Q.sql|
|
||||
SET statement_timeout = 0;
|
||||
|]
|
||||
()
|
||||
False
|
||||
pure DeletedEventLogStats {..}
|
||||
where
|
||||
qTimeout = (fromIntegral $ tlccQueryTimeout * 1000) :: Int64
|
||||
qTriggerName = triggerNameToTxt tlccEventTriggerName
|
||||
qRetentionPeriod = tshow tlccRetentionPeriod <> " hours"
|
||||
qBatchSize = (fromIntegral tlccBatchSize) :: Int64
|
||||
|
||||
deleteEventTriggerLogs ::
|
||||
(MonadIO m) =>
|
||||
PGSourceConfig ->
|
||||
TriggerLogCleanupConfig ->
|
||||
m (Either QErr DeletedEventLogStats)
|
||||
deleteEventTriggerLogs sourceConfig cleanupConfig =
|
||||
liftIO $ runPgSourceWriteTx sourceConfig $ deleteEventTriggerLogsTx cleanupConfig
|
||||
|
@ -17,6 +17,7 @@ module Hasura.RQL.DDL.EventTrigger
|
||||
getTriggerNames,
|
||||
getTriggersMap,
|
||||
getTableNameFromTrigger,
|
||||
getTabInfoFromSchemaCache,
|
||||
cetqSource,
|
||||
cetqName,
|
||||
cetqTable,
|
||||
@ -31,6 +32,9 @@ module Hasura.RQL.DDL.EventTrigger
|
||||
cetqReplace,
|
||||
cetqRequestTransform,
|
||||
cetqResponseTrasnform,
|
||||
cteqCleanupConfig,
|
||||
runCleanupEventTriggerLog,
|
||||
MonadEventLogCleanup (..),
|
||||
)
|
||||
where
|
||||
|
||||
@ -46,6 +50,7 @@ import Data.Text.Extended
|
||||
import Data.URL.Template (printURLTemplate)
|
||||
import Hasura.Base.Error
|
||||
import Hasura.EncJSON
|
||||
import Hasura.Metadata.Class (MetadataStorageT)
|
||||
import Hasura.Prelude
|
||||
import Hasura.RQL.DDL.Headers
|
||||
import Hasura.RQL.DDL.Webhook.Transform (MetadataResponseTransform, RequestTransform)
|
||||
@ -65,6 +70,7 @@ import Hasura.RQL.Types.Table
|
||||
import Hasura.SQL.AnyBackend qualified as AB
|
||||
import Hasura.SQL.Backend
|
||||
import Hasura.Session
|
||||
import Hasura.Tracing (TraceT)
|
||||
import Hasura.Tracing qualified as Tracing
|
||||
import Text.Regex.TDFA qualified as TDFA
|
||||
|
||||
@ -82,7 +88,8 @@ data CreateEventTriggerQuery (b :: BackendType) = CreateEventTriggerQuery
|
||||
_cetqHeaders :: Maybe [HeaderConf],
|
||||
_cetqReplace :: Bool,
|
||||
_cetqRequestTransform :: Maybe RequestTransform,
|
||||
_cetqResponseTrasnform :: Maybe MetadataResponseTransform
|
||||
_cetqResponseTrasnform :: Maybe MetadataResponseTransform,
|
||||
_cteqCleanupConfig :: Maybe AutoTriggerLogCleanupConfig
|
||||
}
|
||||
|
||||
$(makeLenses ''CreateEventTriggerQuery)
|
||||
@ -103,6 +110,7 @@ instance Backend b => FromJSON (CreateEventTriggerQuery b) where
|
||||
replace <- o .:? "replace" .!= False
|
||||
requestTransform <- o .:? "request_transform"
|
||||
responseTransform <- o .:? "response_transform"
|
||||
cleanupConfig <- o .:? "cleanup_config"
|
||||
let regex = "^[A-Za-z]+[A-Za-z0-9_\\-]*$" :: LBS.ByteString
|
||||
compiledRegex = TDFA.makeRegex regex :: TDFA.Regex
|
||||
isMatch = TDFA.match compiledRegex . T.unpack $ triggerNameToTxt name
|
||||
@ -118,7 +126,7 @@ instance Backend b => FromJSON (CreateEventTriggerQuery b) where
|
||||
(Just _, Just _) -> fail "only one of webhook or webhook_from_env should be given"
|
||||
_ -> fail "must provide webhook or webhook_from_env"
|
||||
mapM_ checkEmptyCols [insert, update, delete]
|
||||
return $ CreateEventTriggerQuery sourceName name table insert update delete (Just enableManual) retryConf webhook webhookFromEnv headers replace requestTransform responseTransform
|
||||
return $ CreateEventTriggerQuery sourceName name table insert update delete (Just enableManual) retryConf webhook webhookFromEnv headers replace requestTransform responseTransform cleanupConfig
|
||||
where
|
||||
checkEmptyCols spec =
|
||||
case spec of
|
||||
@ -161,12 +169,29 @@ instance Backend b => FromJSON (InvokeEventTriggerQuery b) where
|
||||
<*> o .:? "source" .!= defaultSource
|
||||
<*> o .: "payload"
|
||||
|
||||
-- | This typeclass have the implementation logic for the event trigger log cleanup
|
||||
class Monad m => MonadEventLogCleanup m where
|
||||
runLogCleaner ::
|
||||
TriggerLogCleanupConfig -> m (Either QErr EncJSON)
|
||||
|
||||
instance (MonadEventLogCleanup m) => MonadEventLogCleanup (ReaderT r m) where
|
||||
runLogCleaner conf = lift $ runLogCleaner conf
|
||||
|
||||
instance (MonadEventLogCleanup m) => MonadEventLogCleanup (MetadataT m) where
|
||||
runLogCleaner conf = lift $ runLogCleaner conf
|
||||
|
||||
instance (MonadEventLogCleanup m) => MonadEventLogCleanup (MetadataStorageT m) where
|
||||
runLogCleaner conf = lift $ runLogCleaner conf
|
||||
|
||||
instance (MonadEventLogCleanup m) => MonadEventLogCleanup (TraceT m) where
|
||||
runLogCleaner conf = lift $ runLogCleaner conf
|
||||
|
||||
resolveEventTriggerQuery ::
|
||||
forall b m.
|
||||
(Backend b, UserInfoM m, QErrM m, CacheRM m) =>
|
||||
CreateEventTriggerQuery b ->
|
||||
m (Bool, EventTriggerConf b)
|
||||
resolveEventTriggerQuery (CreateEventTriggerQuery source name qt insert update delete enableManual retryConf webhook webhookFromEnv mheaders replace reqTransform respTransform) = do
|
||||
resolveEventTriggerQuery (CreateEventTriggerQuery source name qt insert update delete enableManual retryConf webhook webhookFromEnv mheaders replace reqTransform respTransform cleanupConfig) = do
|
||||
ti <- askTableCoreInfo source qt
|
||||
-- can only replace for same table
|
||||
when replace $ do
|
||||
@ -178,7 +203,7 @@ resolveEventTriggerQuery (CreateEventTriggerQuery source name qt insert update d
|
||||
assertCols ti delete
|
||||
|
||||
let rconf = fromMaybe defaultRetryConf retryConf
|
||||
return (replace, EventTriggerConf name (TriggerOpsDef insert update delete enableManual) webhook webhookFromEnv rconf mheaders reqTransform respTransform)
|
||||
return (replace, EventTriggerConf name (TriggerOpsDef insert update delete enableManual) webhook webhookFromEnv rconf mheaders reqTransform respTransform cleanupConfig)
|
||||
where
|
||||
assertCols :: TableCoreInfo b -> Maybe (SubscribeOpSpec b) -> m ()
|
||||
assertCols ti opSpec = onJust opSpec \sos -> case sosColumns sos of
|
||||
@ -380,7 +405,7 @@ buildEventTriggerInfo ::
|
||||
TableName b ->
|
||||
EventTriggerConf b ->
|
||||
m (EventTriggerInfo b, [SchemaDependency])
|
||||
buildEventTriggerInfo env source tableName (EventTriggerConf name def webhook webhookFromEnv rconf mheaders reqTransform respTransform) = do
|
||||
buildEventTriggerInfo env source tableName (EventTriggerConf name def webhook webhookFromEnv rconf mheaders reqTransform respTransform cleanupConfig) = do
|
||||
webhookConf <- case (webhook, webhookFromEnv) of
|
||||
(Just w, Nothing) -> return $ WCValue w
|
||||
(Nothing, Just wEnv) -> return $ WCEnv wEnv
|
||||
@ -388,7 +413,7 @@ buildEventTriggerInfo env source tableName (EventTriggerConf name def webhook we
|
||||
let headerConfs = fromMaybe [] mheaders
|
||||
webhookInfo <- getWebhookInfoFromConf env webhookConf
|
||||
headerInfos <- getHeaderInfosFromConf env headerConfs
|
||||
let eTrigInfo = EventTriggerInfo name def rconf webhookInfo headerInfos reqTransform respTransform
|
||||
let eTrigInfo = EventTriggerInfo name def rconf webhookInfo headerInfos reqTransform respTransform cleanupConfig
|
||||
tabDep =
|
||||
SchemaDependency
|
||||
( SOSourceObj source $
|
||||
@ -450,3 +475,9 @@ getTableNameFromTrigger ::
|
||||
m (TableName b)
|
||||
getTableNameFromTrigger schemaCache sourceName triggerName =
|
||||
(_tciName . _tiCoreInfo) <$> getTabInfoFromSchemaCache @b schemaCache sourceName triggerName
|
||||
|
||||
runCleanupEventTriggerLog ::
|
||||
(MonadEventLogCleanup m, MonadError QErr m) =>
|
||||
TriggerLogCleanupConfig ->
|
||||
m EncJSON
|
||||
runCleanupEventTriggerLog conf = runLogCleaner conf >>= (flip onLeft) throwError
|
||||
|
@ -45,7 +45,7 @@ import Hasura.Metadata.Class
|
||||
import Hasura.Prelude
|
||||
import Hasura.RQL.DDL.Action
|
||||
import Hasura.RQL.DDL.CustomTypes
|
||||
import Hasura.RQL.DDL.EventTrigger (buildEventTriggerInfo)
|
||||
import Hasura.RQL.DDL.EventTrigger (MonadEventLogCleanup (runLogCleaner), buildEventTriggerInfo)
|
||||
import Hasura.RQL.DDL.InheritedRoles (resolveInheritedRole)
|
||||
import Hasura.RQL.DDL.RemoteRelationship (CreateRemoteSchemaRemoteRelationship (..), PartiallyResolvedSource (..), buildRemoteFieldInfo, getRemoteSchemaEntityJoinColumns)
|
||||
import Hasura.RQL.DDL.RemoteSchema
|
||||
@ -177,6 +177,9 @@ newtype CacheRWT m a
|
||||
MonadBaseControl b
|
||||
)
|
||||
|
||||
instance (MonadEventLogCleanup m) => MonadEventLogCleanup (CacheRWT m) where
|
||||
runLogCleaner conf = lift $ runLogCleaner conf
|
||||
|
||||
runCacheRWT ::
|
||||
Functor m =>
|
||||
RebuildableSchemaCache ->
|
||||
|
@ -201,7 +201,7 @@ addEventTriggerToCatalog qt etc = liftTx do
|
||||
False
|
||||
where
|
||||
QualifiedObject sn tn = qt
|
||||
(EventTriggerConf name _ _ _ _ _ _ _) = etc
|
||||
(EventTriggerConf name _ _ _ _ _ _ _ _) = etc
|
||||
|
||||
addComputedFieldToCatalog ::
|
||||
MonadTx m =>
|
||||
|
@ -25,6 +25,9 @@ module Hasura.RQL.Types.EventTrigger
|
||||
EventTriggerInfoMap,
|
||||
EventTriggerInfo (..),
|
||||
FetchBatchSize (..),
|
||||
AutoTriggerLogCleanupConfig (..),
|
||||
TriggerLogCleanupConfig (..),
|
||||
DeletedEventLogStats (..),
|
||||
)
|
||||
where
|
||||
|
||||
@ -40,9 +43,10 @@ import Hasura.Prelude
|
||||
import Hasura.RQL.DDL.Headers
|
||||
import Hasura.RQL.DDL.Webhook.Transform (MetadataResponseTransform, RequestTransform)
|
||||
import Hasura.RQL.Types.Backend
|
||||
import Hasura.RQL.Types.Common (EnvRecord, InputWebhook, ResolvedWebhook, SourceName)
|
||||
import Hasura.RQL.Types.Common (EnvRecord, InputWebhook, ResolvedWebhook, SourceName (..))
|
||||
import Hasura.RQL.Types.Eventing
|
||||
import Hasura.SQL.Backend
|
||||
import System.Cron (CronSchedule)
|
||||
|
||||
-- | Unique name for event trigger.
|
||||
newtype TriggerName = TriggerName {unTriggerName :: NonEmptyText}
|
||||
@ -195,6 +199,76 @@ instance Backend b => FromJSON (TriggerOpsDef b) where
|
||||
instance Backend b => ToJSON (TriggerOpsDef b) where
|
||||
toJSON = genericToJSON hasuraJSON {omitNothingFields = True}
|
||||
|
||||
-- | Automatic event trigger log cleanup configuration
|
||||
data AutoTriggerLogCleanupConfig = AutoTriggerLogCleanupConfig
|
||||
{ -- | cron schedule for the automatic cleanup
|
||||
_atlccSchedule :: CronSchedule,
|
||||
-- | maximum number of events to be deleted in a single cleanup action
|
||||
_atlccBatchSize :: Int,
|
||||
-- | retention period (in hours) for the event trigger logs
|
||||
_atlccRetentionPeriod :: Int,
|
||||
-- | SQL query timeout (in seconds)
|
||||
_atlccQueryTimeout :: Int,
|
||||
-- | should we clean the invocation logs as well
|
||||
_atlccCleanInvocationLogs :: Bool,
|
||||
-- | is the cleanup action paused
|
||||
_atlccPaused :: Bool
|
||||
}
|
||||
deriving (Show, Eq, Generic)
|
||||
|
||||
instance NFData AutoTriggerLogCleanupConfig
|
||||
|
||||
instance Cacheable AutoTriggerLogCleanupConfig
|
||||
|
||||
instance FromJSON AutoTriggerLogCleanupConfig where
|
||||
parseJSON =
|
||||
withObject "AutoTriggerLogCleanupConfig" $ \o -> do
|
||||
_atlccSchedule <- o .: "schedule"
|
||||
_atlccBatchSize <- o .:? "batch_size" .!= 10000
|
||||
_atlccRetentionPeriod <- o .:? "retention_period" .!= 168 -- 7 Days = 168 hours
|
||||
_atlccQueryTimeout <- o .:? "timeout" .!= 60
|
||||
_atlccCleanInvocationLogs <- o .:? "clean_invocation_logs" .!= False
|
||||
_atlccPaused <- o .:? "paused" .!= False
|
||||
pure AutoTriggerLogCleanupConfig {..}
|
||||
|
||||
instance ToJSON AutoTriggerLogCleanupConfig where
|
||||
toJSON = genericToJSON hasuraJSON {omitNothingFields = True}
|
||||
|
||||
-- | Manual event trigger log cleanup configuration
|
||||
data TriggerLogCleanupConfig = TriggerLogCleanupConfig
|
||||
{ -- | name of the event trigger
|
||||
tlccEventTriggerName :: TriggerName,
|
||||
-- | source of the event trigger
|
||||
tlccSourceName :: SourceName,
|
||||
-- | batch size of for the cleanup action
|
||||
tlccBatchSize :: Int,
|
||||
-- | retention period (in hours) for the event trigger logs
|
||||
tlccRetentionPeriod :: Int,
|
||||
-- | SQL query timeout (in seconds)
|
||||
tlccQueryTimeout :: Int,
|
||||
-- | should we clean the invocation logs as well
|
||||
tlccCleanInvocationLogs :: Bool
|
||||
}
|
||||
deriving (Show, Eq, Generic)
|
||||
|
||||
instance NFData TriggerLogCleanupConfig
|
||||
|
||||
instance Cacheable TriggerLogCleanupConfig
|
||||
|
||||
instance FromJSON TriggerLogCleanupConfig where
|
||||
parseJSON =
|
||||
withObject "TriggerLogCleanupConfig" $ \o -> do
|
||||
tlccEventTriggerName <- o .: "event_trigger_name"
|
||||
tlccSourceName <- o .:? "source" .!= SNDefault
|
||||
tlccBatchSize <- o .:? "batch_size" .!= 10000
|
||||
tlccRetentionPeriod <- o .:? "retention_period" .!= 168 -- 7 Days = 168 hours
|
||||
tlccQueryTimeout <- o .:? "timeout" .!= 60
|
||||
tlccCleanInvocationLogs <- o .:? "clean_invocation_logs" .!= False
|
||||
pure TriggerLogCleanupConfig {..}
|
||||
|
||||
instance ToJSON TriggerLogCleanupConfig where
|
||||
toJSON = genericToJSON hasuraJSON {omitNothingFields = True}
|
||||
|
||||
data EventTriggerConf (b :: BackendType) = EventTriggerConf
|
||||
{ etcName :: TriggerName,
|
||||
etcDefinition :: TriggerOpsDef b,
|
||||
@ -203,7 +277,8 @@ data EventTriggerConf (b :: BackendType) = EventTriggerConf
|
||||
etcRetryConf :: RetryConf,
|
||||
etcHeaders :: Maybe [HeaderConf],
|
||||
etcRequestTransform :: Maybe RequestTransform,
|
||||
etcResponseTransform :: Maybe MetadataResponseTransform
|
||||
etcResponseTransform :: Maybe MetadataResponseTransform,
|
||||
etcCleanupConfig :: Maybe AutoTriggerLogCleanupConfig
|
||||
}
|
||||
deriving (Show, Eq, Generic)
|
||||
|
||||
@ -282,7 +357,8 @@ data EventTriggerInfo (b :: BackendType) = EventTriggerInfo
|
||||
-- headers added.
|
||||
etiHeaders :: [EventHeaderInfo],
|
||||
etiRequestTransform :: Maybe RequestTransform,
|
||||
etiResponseTransform :: Maybe MetadataResponseTransform
|
||||
etiResponseTransform :: Maybe MetadataResponseTransform,
|
||||
etiCleanupConfig :: Maybe AutoTriggerLogCleanupConfig
|
||||
}
|
||||
deriving (Generic, Eq)
|
||||
|
||||
@ -295,3 +371,9 @@ type EventTriggerInfoMap b = M.HashMap TriggerName (EventTriggerInfo b)
|
||||
|
||||
newtype FetchBatchSize = FetchBatchSize {_unFetchBatchSize :: Int}
|
||||
deriving (Show, Eq)
|
||||
|
||||
-- | Statistics of deleted event logs and invocation logs
|
||||
data DeletedEventLogStats = DeletedEventLogStats
|
||||
{ deletedEventLogs :: Int,
|
||||
deletedInvocationLogs :: Int
|
||||
}
|
||||
|
@ -219,6 +219,12 @@ class Backend b => BackendEventTrigger (b :: BackendType) where
|
||||
HashSet Ops ->
|
||||
m Bool
|
||||
|
||||
deleteEventTriggerLogs ::
|
||||
(MonadIO m, MonadError QErr m) =>
|
||||
SourceConfig b ->
|
||||
TriggerLogCleanupConfig ->
|
||||
m (Either QErr DeletedEventLogStats)
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
-- TODO: move those instances to 'Backend/*/Instances/Eventing' and create a
|
||||
-- corresponding 'Instances.hs' file in this directory to import them, similarly
|
||||
@ -241,6 +247,7 @@ instance BackendEventTrigger ('Postgres 'Vanilla) where
|
||||
createTableEventTrigger = PG.createTableEventTrigger
|
||||
createMissingSQLTriggers = PG.createMissingSQLTriggers
|
||||
checkIfTriggerExists = PG.checkIfTriggerExists
|
||||
deleteEventTriggerLogs = PG.deleteEventTriggerLogs
|
||||
|
||||
instance BackendEventTrigger ('Postgres 'Citus) where
|
||||
insertManualEvent _ _ _ _ _ _ = throw400 NotSupported $ "Event triggers are not supported for Citus sources"
|
||||
@ -257,6 +264,7 @@ instance BackendEventTrigger ('Postgres 'Citus) where
|
||||
createTableEventTrigger _ _ _ _ _ _ _ = runExceptT $ throw400 NotSupported "Event triggers are not supported for Citus sources"
|
||||
createMissingSQLTriggers _ _ _ _ _ = throw400 NotSupported $ "Event triggers are not supported for Citus sources"
|
||||
checkIfTriggerExists _ _ _ = throw400 NotSupported $ "Event triggers are not supported for Citus sources"
|
||||
deleteEventTriggerLogs _ _ = throw400 NotSupported $ "Event triggers are not supported for Citus sources"
|
||||
|
||||
instance BackendEventTrigger ('Postgres 'Cockroach) where
|
||||
insertManualEvent = PG.insertManualEvent
|
||||
@ -273,6 +281,7 @@ instance BackendEventTrigger ('Postgres 'Cockroach) where
|
||||
createTableEventTrigger = PG.createTableEventTrigger
|
||||
createMissingSQLTriggers = PG.createMissingSQLTriggers
|
||||
checkIfTriggerExists = PG.checkIfTriggerExists
|
||||
deleteEventTriggerLogs = PG.deleteEventTriggerLogs
|
||||
|
||||
instance BackendEventTrigger 'MSSQL where
|
||||
insertManualEvent = MSSQL.insertManualEvent
|
||||
@ -289,6 +298,7 @@ instance BackendEventTrigger 'MSSQL where
|
||||
createTableEventTrigger = MSSQL.createTableEventTrigger
|
||||
createMissingSQLTriggers = MSSQL.createMissingSQLTriggers
|
||||
checkIfTriggerExists = MSSQL.checkIfTriggerExists
|
||||
deleteEventTriggerLogs = MSSQL.deleteEventTriggerLogs
|
||||
|
||||
instance BackendEventTrigger 'BigQuery where
|
||||
insertManualEvent _ _ _ _ _ _ = throw400 NotSupported $ "Event triggers are not supported for BigQuery sources"
|
||||
@ -305,6 +315,7 @@ instance BackendEventTrigger 'BigQuery where
|
||||
createTableEventTrigger _ _ _ _ _ _ _ = runExceptT $ throw400 NotSupported "Event triggers are not supported for BigQuery sources"
|
||||
createMissingSQLTriggers _ _ _ _ _ = throw400 NotSupported $ "Event triggers are not supported for BigQuery sources"
|
||||
checkIfTriggerExists _ _ _ = throw400 NotSupported $ "Event triggers are not supported for BigQuery sources"
|
||||
deleteEventTriggerLogs _ _ = throw400 NotSupported $ "Event triggers are not supported for BigQuery sources"
|
||||
|
||||
instance BackendEventTrigger 'MySQL where
|
||||
insertManualEvent _ _ _ _ _ _ = throw400 NotSupported $ "Event triggers are not supported for MySQL sources"
|
||||
@ -321,6 +332,7 @@ instance BackendEventTrigger 'MySQL where
|
||||
createTableEventTrigger _ _ _ _ _ _ _ = runExceptT $ throw400 NotSupported "Event triggers are not supported for MySQL sources"
|
||||
createMissingSQLTriggers _ _ _ _ _ = throw400 NotSupported $ "Event triggers are not supported for MySQL sources"
|
||||
checkIfTriggerExists _ _ _ = throw400 NotSupported $ "Event triggers are not supported for MySQL sources"
|
||||
deleteEventTriggerLogs _ _ = throw400 NotSupported $ "Event triggers are not supported for MySQL sources"
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
@ -355,3 +367,4 @@ instance BackendEventTrigger 'DataConnector where
|
||||
runExceptT $ throw400 NotSupported "Event triggers are not supported for the Data Connector backend."
|
||||
createMissingSQLTriggers _ _ _ _ _ = throw400 NotSupported $ "Event triggers are not supported for Data Connector backend."
|
||||
checkIfTriggerExists _ _ _ = throw400 NotSupported $ "Event triggers are not supported for Data Connector backend."
|
||||
deleteEventTriggerLogs _ _ = throw400 NotSupported $ "Event triggers are not supported for Data Connector sources"
|
||||
|
@ -309,7 +309,7 @@ sourcesToOrdJSONList sources =
|
||||
<> catMaybes [maybeCommentToMaybeOrdPair comment]
|
||||
|
||||
eventTriggerConfToOrdJSON :: Backend b => EventTriggerConf b -> AO.Value
|
||||
eventTriggerConfToOrdJSON (EventTriggerConf name definition webhook webhookFromEnv retryConf headers reqTransform respTransform) =
|
||||
eventTriggerConfToOrdJSON (EventTriggerConf name definition webhook webhookFromEnv retryConf headers reqTransform respTransform cleanupConfig) =
|
||||
AO.object $
|
||||
[ ("name", AO.toOrdered name),
|
||||
("definition", AO.toOrdered definition),
|
||||
@ -320,7 +320,8 @@ sourcesToOrdJSONList sources =
|
||||
maybeAnyToMaybeOrdPair "webhook_from_env" AO.toOrdered webhookFromEnv,
|
||||
headers >>= listToMaybeOrdPair "headers" AO.toOrdered,
|
||||
fmap (("request_transform",) . AO.toOrdered) reqTransform,
|
||||
fmap (("response_transform",) . AO.toOrdered) respTransform
|
||||
fmap (("response_transform",) . AO.toOrdered) respTransform,
|
||||
maybeAnyToMaybeOrdPair "cleanup_config" AO.toOrdered cleanupConfig
|
||||
]
|
||||
|
||||
functionMetadataToOrdJSON :: Backend b => FunctionMetadata b -> AO.Value
|
||||
|
@ -11,6 +11,7 @@ import Control.Monad.Trans.Control (MonadBaseControl)
|
||||
import Hasura.Base.Error
|
||||
import Hasura.Metadata.Class
|
||||
import Hasura.Prelude
|
||||
import Hasura.RQL.DDL.EventTrigger (MonadEventLogCleanup (runLogCleaner))
|
||||
import Hasura.RQL.Types.Source
|
||||
import Hasura.Server.Types
|
||||
import Hasura.Session
|
||||
@ -52,6 +53,9 @@ instance (MonadResolveSource m) => MonadResolveSource (RunT m) where
|
||||
getPGSourceResolver = RunT . lift . lift $ getPGSourceResolver
|
||||
getMSSQLSourceResolver = RunT . lift . lift $ getMSSQLSourceResolver
|
||||
|
||||
instance (MonadEventLogCleanup m) => MonadEventLogCleanup (RunT m) where
|
||||
runLogCleaner conf = RunT . lift . lift $ runLogCleaner conf
|
||||
|
||||
peelRun ::
|
||||
RunCtx ->
|
||||
RunT m a ->
|
||||
|
@ -52,6 +52,7 @@ import Hasura.RQL.Types.ApiLimit
|
||||
import Hasura.RQL.Types.Common
|
||||
import Hasura.RQL.Types.CustomTypes
|
||||
import Hasura.RQL.Types.Endpoint
|
||||
import Hasura.RQL.Types.EventTrigger
|
||||
import Hasura.RQL.Types.Eventing.Backend
|
||||
import Hasura.RQL.Types.GraphqlSchemaIntrospection
|
||||
import Hasura.RQL.Types.Metadata
|
||||
@ -128,6 +129,7 @@ data RQLMetadataV1
|
||||
| RMDeleteEventTrigger !(AnyBackend DeleteEventTriggerQuery)
|
||||
| RMRedeliverEvent !(AnyBackend RedeliverEventQuery)
|
||||
| RMInvokeEventTrigger !(AnyBackend InvokeEventTriggerQuery)
|
||||
| RMCleanupEventTriggerLog !TriggerLogCleanupConfig
|
||||
| -- Remote schemas
|
||||
RMAddRemoteSchema !AddRemoteSchemaQuery
|
||||
| RMUpdateRemoteSchema !AddRemoteSchemaQuery
|
||||
@ -225,6 +227,7 @@ instance FromJSON RQLMetadataV1 where
|
||||
"create_remote_schema_remote_relationship" -> RMCreateRemoteSchemaRemoteRelationship <$> args
|
||||
"update_remote_schema_remote_relationship" -> RMUpdateRemoteSchemaRemoteRelationship <$> args
|
||||
"delete_remote_schema_remote_relationship" -> RMDeleteRemoteSchemaRemoteRelationship <$> args
|
||||
"cleanup_event_trigger_logs" -> RMCleanupEventTriggerLog <$> args
|
||||
"create_cron_trigger" -> RMCreateCronTrigger <$> args
|
||||
"delete_cron_trigger" -> RMDeleteCronTrigger <$> args
|
||||
"create_scheduled_event" -> RMCreateScheduledEvent <$> args
|
||||
@ -350,7 +353,8 @@ runMetadataQuery ::
|
||||
MonadBaseControl IO m,
|
||||
Tracing.MonadTrace m,
|
||||
MonadMetadataStorage m,
|
||||
MonadResolveSource m
|
||||
MonadResolveSource m,
|
||||
MonadEventLogCleanup m
|
||||
) =>
|
||||
Env.Environment ->
|
||||
L.Logger L.Hasura ->
|
||||
@ -436,7 +440,8 @@ runMetadataQueryM ::
|
||||
MonadMetadataStorageQueryAPI m,
|
||||
HasServerConfigCtx m,
|
||||
MonadReader r m,
|
||||
Has (L.Logger L.Hasura) r
|
||||
Has (L.Logger L.Hasura) r,
|
||||
MonadEventLogCleanup m
|
||||
) =>
|
||||
Env.Environment ->
|
||||
MetadataResourceVersion ->
|
||||
@ -465,7 +470,8 @@ runMetadataQueryV1M ::
|
||||
MonadMetadataStorageQueryAPI m,
|
||||
HasServerConfigCtx m,
|
||||
MonadReader r m,
|
||||
Has (L.Logger L.Hasura) r
|
||||
Has (L.Logger L.Hasura) r,
|
||||
MonadEventLogCleanup m
|
||||
) =>
|
||||
Env.Environment ->
|
||||
MetadataResourceVersion ->
|
||||
@ -519,6 +525,7 @@ runMetadataQueryV1M env currentResourceVersion = \case
|
||||
RMDeleteEventTrigger q -> dispatchMetadataAndEventTrigger runDeleteEventTriggerQuery q
|
||||
RMRedeliverEvent q -> dispatchEventTrigger runRedeliverEvent q
|
||||
RMInvokeEventTrigger q -> dispatchEventTrigger runInvokeEventTrigger q
|
||||
RMCleanupEventTriggerLog q -> runCleanupEventTriggerLog q
|
||||
RMAddRemoteSchema q -> runAddRemoteSchema env q
|
||||
RMUpdateRemoteSchema q -> runUpdateRemoteSchema env q
|
||||
RMRemoveRemoteSchema q -> runRemoveRemoteSchema q
|
||||
|
@ -22,6 +22,7 @@ import Hasura.RQL.Types.ApiLimit
|
||||
import Hasura.RQL.Types.Common
|
||||
import Hasura.RQL.Types.CustomTypes
|
||||
import Hasura.RQL.Types.Endpoint
|
||||
import Hasura.RQL.Types.EventTrigger
|
||||
import Hasura.RQL.Types.GraphqlSchemaIntrospection
|
||||
import Hasura.RQL.Types.Metadata
|
||||
import Hasura.RQL.Types.Network
|
||||
@ -83,6 +84,7 @@ data RQLMetadataV1
|
||||
| RMDeleteEventTrigger !(AnyBackend DeleteEventTriggerQuery)
|
||||
| RMRedeliverEvent !(AnyBackend RedeliverEventQuery)
|
||||
| RMInvokeEventTrigger !(AnyBackend InvokeEventTriggerQuery)
|
||||
| RMCleanupEventTriggerLog !TriggerLogCleanupConfig
|
||||
| -- Remote schemas
|
||||
RMAddRemoteSchema !AddRemoteSchemaQuery
|
||||
| RMUpdateRemoteSchema !AddRemoteSchemaQuery
|
||||
|
@ -63,6 +63,7 @@ import Hasura.HTTP
|
||||
import Hasura.Logging qualified as L
|
||||
import Hasura.Metadata.Class
|
||||
import Hasura.Prelude hiding (get, put)
|
||||
import Hasura.RQL.DDL.EventTrigger (MonadEventLogCleanup)
|
||||
import Hasura.RQL.DDL.Schema
|
||||
import Hasura.RQL.Types.Common
|
||||
import Hasura.RQL.Types.Endpoint as EP
|
||||
@ -442,7 +443,8 @@ v1MetadataHandler ::
|
||||
Tracing.MonadTrace m,
|
||||
MonadMetadataStorage m,
|
||||
MonadResolveSource m,
|
||||
MonadMetadataApiAuthorization m
|
||||
MonadMetadataApiAuthorization m,
|
||||
MonadEventLogCleanup m
|
||||
) =>
|
||||
RQLMetadata ->
|
||||
m (HttpResponse EncJSON)
|
||||
@ -742,7 +744,8 @@ mkWaiApp ::
|
||||
HasResourceLimits m,
|
||||
MonadMetadataStorage (MetadataStorageT m),
|
||||
MonadResolveSource m,
|
||||
EB.MonadQueryTags m
|
||||
EB.MonadQueryTags m,
|
||||
MonadEventLogCleanup m
|
||||
) =>
|
||||
(ServerCtx -> Spock.SpockT m ()) ->
|
||||
-- | Set of environment variables for reference in UIs
|
||||
@ -902,7 +905,8 @@ httpApp ::
|
||||
MonadMetadataStorage (MetadataStorageT m),
|
||||
HasResourceLimits m,
|
||||
MonadResolveSource m,
|
||||
EB.MonadQueryTags m
|
||||
EB.MonadQueryTags m,
|
||||
MonadEventLogCleanup m
|
||||
) =>
|
||||
(ServerCtx -> Spock.SpockT m ()) ->
|
||||
CorsConfig ->
|
||||
|
@ -75,8 +75,8 @@ from3To4 = liftTx $
|
||||
) ->
|
||||
EventTriggerConf ('Postgres 'Vanilla)
|
||||
uncurryEventTrigger (trn, Q.AltJ tDef, w, nr, rint, Q.AltJ headers) =
|
||||
EventTriggerConf trn tDef (Just w) Nothing (RetryConf nr rint Nothing) headers Nothing Nothing
|
||||
updateEventTrigger3To4 etc@(EventTriggerConf name _ _ _ _ _ _ _) =
|
||||
EventTriggerConf trn tDef (Just w) Nothing (RetryConf nr rint Nothing) headers Nothing Nothing Nothing
|
||||
updateEventTrigger3To4 etc@(EventTriggerConf name _ _ _ _ _ _ _ _) =
|
||||
Q.unitQ
|
||||
[Q.sql|
|
||||
UPDATE hdb_catalog.event_triggers
|
||||
|
@ -0,0 +1,4 @@
|
||||
DELETE FROM hdb_catalog.event_log
|
||||
OUTPUT 1
|
||||
WHERE id =
|
||||
ANY ( SELECT id from (VALUES #{eventIdsValues}) AS X(id))
|
@ -0,0 +1,4 @@
|
||||
DELETE FROM hdb_catalog.event_invocation_logs
|
||||
OUTPUT 1
|
||||
WHERE event_id =
|
||||
ANY ( SELECT id from (VALUES #{eventIdsValues}) AS X(id))
|
@ -0,0 +1,5 @@
|
||||
UPDATE hdb_catalog.event_log
|
||||
SET locked = CURRENT_TIMESTAMP
|
||||
WHERE id =
|
||||
ANY ( SELECT id from (VALUES #{eventIdsValues}) AS X(id))
|
||||
AND locked IS NULL
|
@ -0,0 +1,4 @@
|
||||
UPDATE hdb_catalog.event_invocation_logs
|
||||
SET event_id = NULL
|
||||
WHERE event_id =
|
||||
ANY ( SELECT id from (VALUES #{eventIdsValues}) AS X(id))
|
Loading…
Reference in New Issue
Block a user