mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-18 04:51:35 +03:00
342391f39d
This upgrades the version of Ormolu required by the HGE repository to v0.5.0.1, and reformats all code accordingly. Ormolu v0.5 reformats code that uses infix operators. This is mostly useful, adding newlines and indentation to make it clear which operators are applied first, but in some cases, it's unpleasant. To make this easier on the eyes, I had to do the following: * Add a few fixity declarations (search for `infix`) * Add parentheses to make precedence clear, allowing Ormolu to keep everything on one line * Rename `relevantEq` to `(==~)` in #6651 and set it to `infix 4` * Add a few _.ormolu_ files (thanks to @hallettj for helping me get started), mostly for Autodocodec operators that don't have explicit fixity declarations In general, I think these changes are quite reasonable. They mostly affect indentation. PR-URL: https://github.com/hasura/graphql-engine-mono/pull/6675 GitOrigin-RevId: cd47d87f1d089fb0bc9dcbbe7798dbceedcd7d83
1139 lines
41 KiB
Haskell
1139 lines
41 KiB
Haskell
{-# LANGUAGE QuasiQuotes #-}
|
|
{-# LANGUAGE TemplateHaskell #-}
|
|
|
|
-- | Postgres DDL EventTrigger
|
|
--
|
|
-- Used for creating event triggers for metadata changes.
|
|
--
|
|
-- See 'Hasura.RQL.DDL.Schema.Cache' and 'Hasura.RQL.Types.Eventing.Backend'.
|
|
module Hasura.Backends.Postgres.DDL.EventTrigger
|
|
( insertManualEvent,
|
|
redeliverEvent,
|
|
dropTriggerAndArchiveEvents,
|
|
createTableEventTrigger,
|
|
createMissingSQLTriggers,
|
|
dropTriggerQ,
|
|
dropDanglingSQLTrigger,
|
|
mkAllTriggersQ,
|
|
getMaintenanceModeVersion,
|
|
fetchUndeliveredEvents,
|
|
setRetry,
|
|
recordSuccess,
|
|
recordError,
|
|
recordError',
|
|
unlockEventsInSource,
|
|
updateColumnInEventTrigger,
|
|
checkIfTriggerExists,
|
|
addCleanupSchedules,
|
|
deleteAllScheduledCleanups,
|
|
getCleanupEventsForDeletion,
|
|
updateCleanupEventStatusToDead,
|
|
updateCleanupEventStatusToPaused,
|
|
updateCleanupEventStatusToCompleted,
|
|
deleteEventTriggerLogs,
|
|
)
|
|
where
|
|
|
|
import Control.Monad.Trans.Control (MonadBaseControl)
|
|
import Data.Aeson
|
|
import Data.FileEmbed (makeRelativeToProject)
|
|
import Data.HashMap.Strict qualified as Map
|
|
import Data.HashSet qualified as HashSet
|
|
import Data.Int (Int64)
|
|
import Data.Set.NonEmpty qualified as NE
|
|
import Data.Text.Lazy qualified as TL
|
|
import Data.Time.Clock qualified as Time
|
|
import Database.PG.Query qualified as PG
|
|
import Hasura.Backends.Postgres.Connection
|
|
import Hasura.Backends.Postgres.SQL.DML
|
|
import Hasura.Backends.Postgres.SQL.DML qualified as S
|
|
import Hasura.Backends.Postgres.SQL.Types hiding (TableName)
|
|
import Hasura.Backends.Postgres.Translate.Column
|
|
import Hasura.Base.Error
|
|
import Hasura.Eventing.Common
|
|
import Hasura.Prelude
|
|
import Hasura.RQL.Types.Backend (Backend, SourceConfig, TableName)
|
|
import Hasura.RQL.Types.Column
|
|
import Hasura.RQL.Types.Common
|
|
import Hasura.RQL.Types.EventTrigger
|
|
import Hasura.RQL.Types.Eventing
|
|
import Hasura.RQL.Types.ScheduledTrigger (formatTime')
|
|
import Hasura.RQL.Types.Source
|
|
import Hasura.RQL.Types.Table (PrimaryKey)
|
|
import Hasura.SQL.Backend
|
|
import Hasura.SQL.Types
|
|
import Hasura.Server.Migrate.Internal
|
|
import Hasura.Server.Migrate.LatestVersion
|
|
import Hasura.Server.Migrate.Version
|
|
import Hasura.Server.Types
|
|
import Hasura.Session
|
|
import Hasura.Tracing qualified as Tracing
|
|
import Text.Builder qualified as TB
|
|
import Text.Shakespeare.Text qualified as ST
|
|
|
|
fetchUndeliveredEvents ::
|
|
(MonadIO m, MonadError QErr m) =>
|
|
SourceConfig ('Postgres pgKind) ->
|
|
SourceName ->
|
|
[TriggerName] ->
|
|
MaintenanceMode () ->
|
|
FetchBatchSize ->
|
|
m [Event ('Postgres pgKind)]
|
|
fetchUndeliveredEvents sourceConfig sourceName triggerNames maintenanceMode fetchBatchSize = do
|
|
fetchEventsTxE <-
|
|
case maintenanceMode of
|
|
MaintenanceModeEnabled () -> do
|
|
maintenanceModeVersion <- liftIO $ runPgSourceReadTx sourceConfig getMaintenanceModeVersionTx
|
|
pure $ fmap (fetchEventsMaintenanceMode sourceName triggerNames fetchBatchSize) maintenanceModeVersion
|
|
MaintenanceModeDisabled -> pure $ Right $ fetchEvents sourceName triggerNames fetchBatchSize
|
|
case fetchEventsTxE of
|
|
Left err -> throwError $ prefixQErr "something went wrong while fetching events: " err
|
|
Right fetchEventsTx ->
|
|
liftEitherM $
|
|
liftIO $
|
|
runPgSourceWriteTx sourceConfig fetchEventsTx
|
|
|
|
setRetry ::
|
|
( MonadIO m,
|
|
MonadError QErr m
|
|
) =>
|
|
SourceConfig ('Postgres pgKind) ->
|
|
Event ('Postgres pgKind) ->
|
|
Time.UTCTime ->
|
|
MaintenanceMode MaintenanceModeVersion ->
|
|
m ()
|
|
setRetry sourceConfig event retryTime maintenanceModeVersion =
|
|
liftEitherM $ liftIO $ runPgSourceWriteTx sourceConfig (setRetryTx event retryTime maintenanceModeVersion)
|
|
|
|
insertManualEvent ::
|
|
(MonadIO m, MonadError QErr m) =>
|
|
SourceConfig ('Postgres pgKind) ->
|
|
TableName ('Postgres pgKind) ->
|
|
TriggerName ->
|
|
Value ->
|
|
UserInfo ->
|
|
Tracing.TraceContext ->
|
|
m EventId
|
|
insertManualEvent sourceConfig tableName triggerName payload userInfo traceCtx =
|
|
-- NOTE: The methods `setTraceContextInTx` and `setHeadersTx` are being used
|
|
-- to ensure that the trace context and user info are set with valid values
|
|
-- while being used in the PG function `insert_event_log`.
|
|
-- See Issue(#7087) for more details on a bug that was being caused
|
|
-- in the absence of these methods.
|
|
liftEitherM $
|
|
liftIO $
|
|
runPgSourceWriteTx sourceConfig $
|
|
setHeadersTx (_uiSession userInfo)
|
|
>> setTraceContextInTx traceCtx
|
|
>> insertPGManualEvent tableName triggerName payload
|
|
|
|
getMaintenanceModeVersion ::
|
|
( MonadIO m,
|
|
MonadError QErr m
|
|
) =>
|
|
SourceConfig ('Postgres pgKind) ->
|
|
m MaintenanceModeVersion
|
|
getMaintenanceModeVersion sourceConfig =
|
|
liftEitherM $ liftIO $ runPgSourceReadTx sourceConfig getMaintenanceModeVersionTx
|
|
|
|
recordSuccess ::
|
|
(MonadIO m) =>
|
|
SourceConfig ('Postgres pgKind) ->
|
|
Event ('Postgres pgKind) ->
|
|
Invocation 'EventType ->
|
|
MaintenanceMode MaintenanceModeVersion ->
|
|
m (Either QErr ())
|
|
recordSuccess sourceConfig event invocation maintenanceModeVersion =
|
|
liftIO $
|
|
runPgSourceWriteTx sourceConfig $ do
|
|
insertInvocation (tmName (eTrigger event)) invocation
|
|
setSuccessTx event maintenanceModeVersion
|
|
|
|
recordError ::
|
|
(MonadIO m) =>
|
|
SourceConfig ('Postgres pgKind) ->
|
|
Event ('Postgres pgKind) ->
|
|
Invocation 'EventType ->
|
|
ProcessEventError ->
|
|
MaintenanceMode MaintenanceModeVersion ->
|
|
m (Either QErr ())
|
|
recordError sourceConfig event invocation processEventError maintenanceModeVersion =
|
|
recordError' sourceConfig event (Just invocation) processEventError maintenanceModeVersion
|
|
|
|
recordError' ::
|
|
(MonadIO m) =>
|
|
SourceConfig ('Postgres pgKind) ->
|
|
Event ('Postgres pgKind) ->
|
|
Maybe (Invocation 'EventType) ->
|
|
ProcessEventError ->
|
|
MaintenanceMode MaintenanceModeVersion ->
|
|
m (Either QErr ())
|
|
recordError' sourceConfig event invocation processEventError maintenanceModeVersion =
|
|
liftIO $
|
|
runPgSourceWriteTx sourceConfig $ do
|
|
for_ invocation $ insertInvocation (tmName (eTrigger event))
|
|
case processEventError of
|
|
PESetRetry retryTime -> setRetryTx event retryTime maintenanceModeVersion
|
|
PESetError -> setErrorTx event maintenanceModeVersion
|
|
|
|
redeliverEvent ::
|
|
(MonadIO m, MonadError QErr m) =>
|
|
SourceConfig ('Postgres pgKind) ->
|
|
EventId ->
|
|
m ()
|
|
redeliverEvent sourceConfig eventId =
|
|
liftEitherM $ liftIO $ runPgSourceWriteTx sourceConfig (redeliverEventTx eventId)
|
|
|
|
dropTriggerAndArchiveEvents ::
|
|
( MonadIO m,
|
|
MonadError QErr m
|
|
) =>
|
|
SourceConfig ('Postgres pgKind) ->
|
|
TriggerName ->
|
|
QualifiedTable ->
|
|
m ()
|
|
dropTriggerAndArchiveEvents sourceConfig triggerName _table =
|
|
liftEitherM $
|
|
liftIO $
|
|
runPgSourceWriteTx sourceConfig $ do
|
|
dropTriggerQ triggerName
|
|
archiveEvents triggerName
|
|
|
|
createMissingSQLTriggers ::
|
|
( MonadIO m,
|
|
MonadError QErr m,
|
|
MonadBaseControl IO m,
|
|
HasServerConfigCtx m,
|
|
Backend ('Postgres pgKind)
|
|
) =>
|
|
PGSourceConfig ->
|
|
TableName ('Postgres pgKind) ->
|
|
([(ColumnInfo ('Postgres pgKind))], Maybe (PrimaryKey ('Postgres pgKind) (ColumnInfo ('Postgres pgKind)))) ->
|
|
TriggerName ->
|
|
TriggerOpsDef ('Postgres pgKind) ->
|
|
m ()
|
|
createMissingSQLTriggers sourceConfig table (allCols, _) triggerName opsDefinition = do
|
|
serverConfigCtx <- askServerConfigCtx
|
|
liftEitherM $
|
|
runPgSourceWriteTx sourceConfig $ do
|
|
for_ (tdInsert opsDefinition) (doesSQLTriggerExist serverConfigCtx INSERT)
|
|
for_ (tdUpdate opsDefinition) (doesSQLTriggerExist serverConfigCtx UPDATE)
|
|
for_ (tdDelete opsDefinition) (doesSQLTriggerExist serverConfigCtx DELETE)
|
|
where
|
|
doesSQLTriggerExist serverConfigCtx op opSpec = do
|
|
let opTriggerName = pgTriggerName op triggerName
|
|
doesOpTriggerFunctionExist <-
|
|
runIdentity . PG.getRow
|
|
<$> PG.withQE
|
|
defaultTxErrorHandler
|
|
[PG.sql|
|
|
SELECT EXISTS
|
|
( SELECT 1
|
|
FROM pg_proc
|
|
WHERE proname = $1
|
|
)
|
|
|]
|
|
(Identity opTriggerName)
|
|
True
|
|
unless doesOpTriggerFunctionExist $
|
|
flip runReaderT serverConfigCtx $
|
|
mkTrigger triggerName table allCols op opSpec
|
|
|
|
createTableEventTrigger ::
|
|
(Backend ('Postgres pgKind), MonadIO m, MonadBaseControl IO m) =>
|
|
ServerConfigCtx ->
|
|
PGSourceConfig ->
|
|
QualifiedTable ->
|
|
[ColumnInfo ('Postgres pgKind)] ->
|
|
TriggerName ->
|
|
TriggerOpsDef ('Postgres pgKind) ->
|
|
Maybe (PrimaryKey ('Postgres pgKind) (ColumnInfo ('Postgres pgKind))) ->
|
|
m (Either QErr ())
|
|
createTableEventTrigger serverConfigCtx sourceConfig table columns triggerName opsDefinition _ = runPgSourceWriteTx sourceConfig $ do
|
|
-- Create the given triggers
|
|
flip runReaderT serverConfigCtx $
|
|
mkAllTriggersQ triggerName table columns opsDefinition
|
|
|
|
dropDanglingSQLTrigger ::
|
|
( MonadIO m,
|
|
MonadError QErr m
|
|
) =>
|
|
SourceConfig ('Postgres pgKind) ->
|
|
TriggerName ->
|
|
QualifiedTable ->
|
|
HashSet Ops ->
|
|
m ()
|
|
dropDanglingSQLTrigger sourceConfig triggerName _ ops =
|
|
liftEitherM $
|
|
liftIO $
|
|
runPgSourceWriteTx sourceConfig $
|
|
traverse_ (dropTriggerOp triggerName) ops
|
|
|
|
updateColumnInEventTrigger ::
|
|
QualifiedTable ->
|
|
PGCol ->
|
|
PGCol ->
|
|
QualifiedTable ->
|
|
EventTriggerConf ('Postgres pgKind) ->
|
|
EventTriggerConf ('Postgres pgKind)
|
|
updateColumnInEventTrigger table oCol nCol refTable = rewriteEventTriggerConf
|
|
where
|
|
rewriteSubsCols = \case
|
|
SubCStar -> SubCStar
|
|
SubCArray cols -> SubCArray $ map getNewCol cols
|
|
rewriteOpSpec (SubscribeOpSpec listenColumns deliveryColumns) =
|
|
SubscribeOpSpec
|
|
(rewriteSubsCols listenColumns)
|
|
(rewriteSubsCols <$> deliveryColumns)
|
|
rewriteTrigOpsDef (TriggerOpsDef ins upd del man) =
|
|
TriggerOpsDef
|
|
(rewriteOpSpec <$> ins)
|
|
(rewriteOpSpec <$> upd)
|
|
(rewriteOpSpec <$> del)
|
|
man
|
|
rewriteEventTriggerConf etc =
|
|
etc
|
|
{ etcDefinition =
|
|
rewriteTrigOpsDef $ etcDefinition etc
|
|
}
|
|
getNewCol col =
|
|
if table == refTable && oCol == col then nCol else col
|
|
|
|
unlockEventsInSource ::
|
|
MonadIO m =>
|
|
SourceConfig ('Postgres pgKind) ->
|
|
NE.NESet EventId ->
|
|
m (Either QErr Int)
|
|
unlockEventsInSource sourceConfig eventIds =
|
|
liftIO $ runPgSourceWriteTx sourceConfig (unlockEventsTx $ toList eventIds)
|
|
|
|
-- Check if any trigger function for any of the operation exists with the 'triggerName'
|
|
checkIfTriggerExists ::
|
|
(MonadIO m, MonadError QErr m) =>
|
|
PGSourceConfig ->
|
|
TriggerName ->
|
|
HashSet Ops ->
|
|
m Bool
|
|
checkIfTriggerExists sourceConfig triggerName ops = do
|
|
liftEitherM $
|
|
liftIO $
|
|
runPgSourceWriteTx sourceConfig $
|
|
-- We want to avoid creating event triggers with same name since this will
|
|
-- cause undesired behaviour. Note that only SQL functions associated with
|
|
-- SQL triggers are dropped when "replace = true" is set in the event trigger
|
|
-- configuration. Hence, the best way to check if we should allow the
|
|
-- creation of a trigger with the name 'triggerName' is to check if any
|
|
-- function with such a name exists in the the hdb_catalog.
|
|
--
|
|
-- For eg: If a create_event_trigger request comes with trigger name as
|
|
-- "triggerName" and there is already a trigger with "triggerName" in the
|
|
-- metadata, then
|
|
-- 1. When "replace = false", the function with name 'triggerName' exists
|
|
-- so the creation is not allowed
|
|
-- 2. When "replace = true", the function with name 'triggerName' is first
|
|
-- dropped, hence we are allowed to create the trigger with name
|
|
-- 'triggerName'
|
|
fmap or (traverse (checkIfFunctionExistsQ triggerName) (HashSet.toList ops))
|
|
|
|
---- DATABASE QUERIES ---------------------
|
|
--
|
|
-- The API for our in-database work queue:
|
|
-------------------------------------------
|
|
|
|
insertInvocation :: TriggerName -> Invocation 'EventType -> PG.TxE QErr ()
|
|
insertInvocation tName invo = do
|
|
PG.unitQE
|
|
defaultTxErrorHandler
|
|
[PG.sql|
|
|
INSERT INTO hdb_catalog.event_invocation_logs (event_id, trigger_name, status, request, response)
|
|
VALUES ($1, $2, $3, $4, $5)
|
|
|]
|
|
( iEventId invo,
|
|
(triggerNameToTxt tName),
|
|
fromIntegral <$> iStatus invo :: Maybe Int64,
|
|
PG.ViaJSON $ toJSON $ iRequest invo,
|
|
PG.ViaJSON $ toJSON $ iResponse invo
|
|
)
|
|
True
|
|
PG.unitQE
|
|
defaultTxErrorHandler
|
|
[PG.sql|
|
|
UPDATE hdb_catalog.event_log
|
|
|
|
SET tries = tries + 1
|
|
WHERE id = $1
|
|
|]
|
|
(Identity $ iEventId invo)
|
|
True
|
|
|
|
insertPGManualEvent ::
|
|
QualifiedTable ->
|
|
TriggerName ->
|
|
Value ->
|
|
PG.TxE QErr EventId
|
|
insertPGManualEvent (QualifiedObject schemaName tableName) triggerName rowData = do
|
|
runIdentity . PG.getRow
|
|
<$> PG.withQE
|
|
defaultTxErrorHandler
|
|
[PG.sql|
|
|
SELECT hdb_catalog.insert_event_log($1, $2, $3, $4, $5)
|
|
|]
|
|
(schemaName, tableName, triggerName, (tshow MANUAL), PG.ViaJSON rowData)
|
|
False
|
|
|
|
archiveEvents :: TriggerName -> PG.TxE QErr ()
|
|
archiveEvents trn =
|
|
PG.unitQE
|
|
defaultTxErrorHandler
|
|
[PG.sql|
|
|
UPDATE hdb_catalog.event_log
|
|
SET archived = 't'
|
|
WHERE trigger_name = $1
|
|
|]
|
|
(Identity trn)
|
|
False
|
|
|
|
getMaintenanceModeVersionTx :: PG.TxE QErr MaintenanceModeVersion
|
|
getMaintenanceModeVersionTx = liftTx $ do
|
|
catalogVersion <- getCatalogVersion -- From the user's DB
|
|
-- the previous version and the current version will change depending
|
|
-- upon between which versions we need to support maintenance mode
|
|
if
|
|
| catalogVersion == MetadataCatalogVersion 40 -> pure PreviousMMVersion
|
|
-- The catalog is migrated to the 43rd version for a source
|
|
-- which was initialised by a v1 graphql-engine instance (See @initSource@).
|
|
| catalogVersion == MetadataCatalogVersion 43 -> pure CurrentMMVersion
|
|
| catalogVersion == latestCatalogVersion -> pure CurrentMMVersion
|
|
| otherwise ->
|
|
throw500 $
|
|
"Maintenance mode is only supported with catalog versions: 40, 43 and "
|
|
<> tshow latestCatalogVersionString
|
|
|
|
-- | Lock and return events not yet being processed or completed, up to some
|
|
-- limit. Process events approximately in created_at order, but we make no
|
|
-- ordering guarentees; events can and will race. Nevertheless we want to
|
|
-- ensure newer change events don't starve older ones.
|
|
fetchEvents :: SourceName -> [TriggerName] -> FetchBatchSize -> PG.TxE QErr [Event ('Postgres pgKind)]
|
|
fetchEvents source triggerNames (FetchBatchSize fetchBatchSize) =
|
|
map uncurryEvent
|
|
<$> PG.withQE
|
|
defaultTxErrorHandler
|
|
[PG.sql|
|
|
UPDATE hdb_catalog.event_log
|
|
SET locked = NOW()
|
|
WHERE id IN ( SELECT l.id
|
|
FROM hdb_catalog.event_log l
|
|
WHERE l.delivered = 'f' and l.error = 'f'
|
|
and (l.locked IS NULL or l.locked < (NOW() - interval '30 minute'))
|
|
and (l.next_retry_at is NULL or l.next_retry_at <= now())
|
|
and l.archived = 'f'
|
|
and l.trigger_name = ANY($2)
|
|
/* NB: this ordering is important for our index `event_log_fetch_events` */
|
|
/* (see `init_pg_source.sql`) */
|
|
ORDER BY locked NULLS FIRST, next_retry_at NULLS FIRST, created_at
|
|
LIMIT $1
|
|
FOR UPDATE SKIP LOCKED )
|
|
RETURNING id, schema_name, table_name, trigger_name, payload::json, tries, created_at
|
|
|]
|
|
(limit, triggerNamesTxt)
|
|
True
|
|
where
|
|
uncurryEvent (id', sourceName, tableName, triggerName, PG.ViaJSON payload, tries, created) =
|
|
Event
|
|
{ eId = id',
|
|
eSource = source,
|
|
eTable = QualifiedObject sourceName tableName,
|
|
eTrigger = TriggerMetadata triggerName,
|
|
eEvent = payload,
|
|
eTries = tries,
|
|
eCreatedAt = created
|
|
}
|
|
limit = fromIntegral fetchBatchSize :: Word64
|
|
|
|
triggerNamesTxt = PGTextArray $ triggerNameToTxt <$> triggerNames
|
|
|
|
fetchEventsMaintenanceMode :: SourceName -> [TriggerName] -> FetchBatchSize -> MaintenanceModeVersion -> PG.TxE QErr [Event ('Postgres pgKind)]
|
|
fetchEventsMaintenanceMode sourceName triggerNames fetchBatchSize = \case
|
|
PreviousMMVersion ->
|
|
map uncurryEvent
|
|
<$> PG.withQE
|
|
defaultTxErrorHandler
|
|
[PG.sql|
|
|
UPDATE hdb_catalog.event_log
|
|
SET locked = 't'
|
|
WHERE id IN ( SELECT l.id
|
|
FROM hdb_catalog.event_log l
|
|
WHERE l.delivered = 'f' and l.error = 'f' and l.locked = 'f'
|
|
and (l.next_retry_at is NULL or l.next_retry_at <= now())
|
|
and l.archived = 'f'
|
|
ORDER BY created_at
|
|
LIMIT $1
|
|
FOR UPDATE SKIP LOCKED )
|
|
RETURNING id, schema_name, table_name, trigger_name, payload::json, tries, created_at
|
|
|]
|
|
(Identity limit)
|
|
True
|
|
where
|
|
uncurryEvent (id', sn, tn, trn, PG.ViaJSON payload, tries, created) =
|
|
Event
|
|
{ eId = id',
|
|
eSource = SNDefault, -- in v1, there'll only be the default source
|
|
eTable = QualifiedObject sn tn,
|
|
eTrigger = TriggerMetadata trn,
|
|
eEvent = payload,
|
|
eTries = tries,
|
|
eCreatedAt = created
|
|
}
|
|
limit = fromIntegral (_unFetchBatchSize fetchBatchSize) :: Word64
|
|
CurrentMMVersion -> fetchEvents sourceName triggerNames fetchBatchSize
|
|
|
|
setSuccessTx :: Event ('Postgres pgKind) -> MaintenanceMode MaintenanceModeVersion -> PG.TxE QErr ()
|
|
setSuccessTx e = \case
|
|
(MaintenanceModeEnabled PreviousMMVersion) ->
|
|
PG.unitQE
|
|
defaultTxErrorHandler
|
|
[PG.sql|
|
|
UPDATE hdb_catalog.event_log
|
|
SET delivered = 't', next_retry_at = NULL, locked = 'f'
|
|
WHERE id = $1
|
|
|]
|
|
(Identity $ eId e)
|
|
True
|
|
(MaintenanceModeEnabled CurrentMMVersion) -> latestVersionSetSuccess
|
|
MaintenanceModeDisabled -> latestVersionSetSuccess
|
|
where
|
|
latestVersionSetSuccess =
|
|
PG.unitQE
|
|
defaultTxErrorHandler
|
|
[PG.sql|
|
|
UPDATE hdb_catalog.event_log
|
|
SET delivered = 't', next_retry_at = NULL, locked = NULL
|
|
WHERE id = $1
|
|
|]
|
|
(Identity $ eId e)
|
|
True
|
|
|
|
setErrorTx :: Event ('Postgres pgKind) -> MaintenanceMode MaintenanceModeVersion -> PG.TxE QErr ()
|
|
setErrorTx e = \case
|
|
(MaintenanceModeEnabled PreviousMMVersion) ->
|
|
PG.unitQE
|
|
defaultTxErrorHandler
|
|
[PG.sql|
|
|
UPDATE hdb_catalog.event_log
|
|
SET error = 't', next_retry_at = NULL, locked = 'f'
|
|
WHERE id = $1
|
|
|]
|
|
(Identity $ eId e)
|
|
True
|
|
(MaintenanceModeEnabled CurrentMMVersion) -> latestVersionSetError
|
|
MaintenanceModeDisabled -> latestVersionSetError
|
|
where
|
|
latestVersionSetError =
|
|
PG.unitQE
|
|
defaultTxErrorHandler
|
|
[PG.sql|
|
|
UPDATE hdb_catalog.event_log
|
|
SET error = 't', next_retry_at = NULL, locked = NULL
|
|
WHERE id = $1
|
|
|]
|
|
(Identity $ eId e)
|
|
True
|
|
|
|
setRetryTx :: Event ('Postgres pgKind) -> Time.UTCTime -> MaintenanceMode MaintenanceModeVersion -> PG.TxE QErr ()
|
|
setRetryTx e time = \case
|
|
(MaintenanceModeEnabled PreviousMMVersion) ->
|
|
PG.unitQE
|
|
defaultTxErrorHandler
|
|
[PG.sql|
|
|
UPDATE hdb_catalog.event_log
|
|
SET next_retry_at = $1, locked = 'f'
|
|
WHERE id = $2
|
|
|]
|
|
(time, eId e)
|
|
True
|
|
(MaintenanceModeEnabled CurrentMMVersion) -> latestVersionSetRetry
|
|
MaintenanceModeDisabled -> latestVersionSetRetry
|
|
where
|
|
latestVersionSetRetry =
|
|
PG.unitQE
|
|
defaultTxErrorHandler
|
|
[PG.sql|
|
|
UPDATE hdb_catalog.event_log
|
|
SET next_retry_at = $1, locked = NULL
|
|
WHERE id = $2
|
|
|]
|
|
(time, eId e)
|
|
True
|
|
|
|
dropTriggerQ :: TriggerName -> PG.TxE QErr ()
|
|
dropTriggerQ trn =
|
|
mapM_ (dropTriggerOp trn) [INSERT, UPDATE, DELETE]
|
|
|
|
dropTriggerOp :: TriggerName -> Ops -> PG.TxE QErr ()
|
|
dropTriggerOp triggerName triggerOp =
|
|
PG.unitQE
|
|
defaultTxErrorHandler
|
|
(PG.fromText $ getDropFuncSql triggerOp)
|
|
()
|
|
False
|
|
where
|
|
getDropFuncSql :: Ops -> Text
|
|
getDropFuncSql op =
|
|
"DROP FUNCTION IF EXISTS"
|
|
<> " hdb_catalog."
|
|
<> unQualifiedTriggerName (pgIdenTrigger op triggerName)
|
|
<> "()"
|
|
<> " CASCADE"
|
|
|
|
checkEvent :: EventId -> PG.TxE QErr ()
|
|
checkEvent eid = do
|
|
events <-
|
|
PG.withQE
|
|
defaultTxErrorHandler
|
|
[PG.sql|
|
|
SELECT l.locked IS NOT NULL AND l.locked >= (NOW() - interval '30 minute')
|
|
FROM hdb_catalog.event_log l
|
|
WHERE l.id = $1
|
|
|]
|
|
(Identity eid)
|
|
True
|
|
event <- getEvent events
|
|
assertEventUnlocked event
|
|
where
|
|
getEvent [] = throw400 NotExists "event not found"
|
|
getEvent (x : _) = return x
|
|
|
|
assertEventUnlocked (Identity locked) =
|
|
when locked $
|
|
throw400 Busy "event is already being processed"
|
|
|
|
markForDelivery :: EventId -> PG.TxE QErr ()
|
|
markForDelivery eid =
|
|
PG.unitQE
|
|
defaultTxErrorHandler
|
|
[PG.sql|
|
|
UPDATE hdb_catalog.event_log
|
|
SET
|
|
delivered = 'f',
|
|
error = 'f',
|
|
tries = 0
|
|
WHERE id = $1
|
|
|]
|
|
(Identity eid)
|
|
True
|
|
|
|
redeliverEventTx :: EventId -> PG.TxE QErr ()
|
|
redeliverEventTx eventId = do
|
|
checkEvent eventId
|
|
markForDelivery eventId
|
|
|
|
-- | unlockEvents takes an array of 'EventId' and unlocks them. This function is called
|
|
-- when a graceful shutdown is initiated.
|
|
unlockEventsTx :: [EventId] -> PG.TxE QErr Int
|
|
unlockEventsTx eventIds =
|
|
runIdentity . PG.getRow
|
|
<$> PG.withQE
|
|
defaultTxErrorHandler
|
|
[PG.sql|
|
|
WITH "cte" AS
|
|
(UPDATE hdb_catalog.event_log
|
|
SET locked = NULL
|
|
WHERE id = ANY($1::text[])
|
|
-- only unlock those events that have been locked, it's possible
|
|
-- that an event has been processed but not yet been removed from
|
|
-- the saved locked events, which will lead to a double send
|
|
AND locked IS NOT NULL
|
|
RETURNING *)
|
|
SELECT count(*) FROM "cte"
|
|
|]
|
|
(Identity $ PGTextArray $ map unEventId eventIds)
|
|
True
|
|
|
|
---- Postgres event trigger utility functions ---------------------
|
|
|
|
-- | QualifiedTriggerName is a type to store the name of the SQL trigger.
|
|
-- An example of it is `"notify_hasura_users_all_INSERT"` where `users_all`
|
|
-- is the name of the event trigger.
|
|
newtype QualifiedTriggerName = QualifiedTriggerName {unQualifiedTriggerName :: Text}
|
|
deriving (Show, Eq, PG.ToPrepArg)
|
|
|
|
pgTriggerName :: Ops -> TriggerName -> QualifiedTriggerName
|
|
pgTriggerName op trn = qualifyTriggerName op $ triggerNameToTxt trn
|
|
where
|
|
qualifyTriggerName op' trn' =
|
|
QualifiedTriggerName $ "notify_hasura_" <> trn' <> "_" <> tshow op'
|
|
|
|
pgIdenTrigger :: Ops -> TriggerName -> QualifiedTriggerName
|
|
pgIdenTrigger op = QualifiedTriggerName . pgFmtIdentifier . unQualifiedTriggerName . pgTriggerName op
|
|
|
|
-- | pgIdenTrigger is a method used to construct the name of the pg function
|
|
-- used for event triggers which are present in the hdb_catalog schema.
|
|
|
|
-- | Define the pgSQL trigger functions on database events.
|
|
mkTriggerFunctionQ ::
|
|
forall pgKind m.
|
|
(Backend ('Postgres pgKind), MonadTx m, MonadReader ServerConfigCtx m) =>
|
|
TriggerName ->
|
|
QualifiedTable ->
|
|
[ColumnInfo ('Postgres pgKind)] ->
|
|
Ops ->
|
|
SubscribeOpSpec ('Postgres pgKind) ->
|
|
m QualifiedTriggerName
|
|
mkTriggerFunctionQ triggerName (QualifiedObject schema table) allCols op (SubscribeOpSpec listenColumns deliveryColumns') = do
|
|
strfyNum <- stringifyNum . _sccSQLGenCtx <$> ask
|
|
let dbQualifiedTriggerName = pgIdenTrigger op triggerName
|
|
() <-
|
|
liftTx $
|
|
PG.multiQE defaultTxErrorHandler $
|
|
PG.fromText . TL.toStrict $
|
|
let -- If there are no specific delivery columns selected by user then all the columns will be delivered
|
|
-- in payload hence 'SubCStar'.
|
|
deliveryColumns = fromMaybe SubCStar deliveryColumns'
|
|
getApplicableColumns = \case
|
|
SubCStar -> allCols
|
|
SubCArray cols -> getColInfos cols allCols
|
|
|
|
-- Columns that should be present in the payload. By default, all columns are present.
|
|
applicableDeliveryCols = getApplicableColumns deliveryColumns
|
|
getRowExpression opVar = applyRowToJson' $ mkRowExpression opVar strfyNum applicableDeliveryCols
|
|
|
|
-- Columns that user subscribed to listen for changes. By default, we listen on all columns.
|
|
applicableListenCols = getApplicableColumns listenColumns
|
|
renderRow opVar = applyRow $ mkRowExpression opVar strfyNum applicableListenCols
|
|
|
|
oldDataExp = case op of
|
|
INSERT -> SENull
|
|
UPDATE -> getRowExpression OLD
|
|
DELETE -> getRowExpression OLD
|
|
MANUAL -> SENull
|
|
newDataExp = case op of
|
|
INSERT -> getRowExpression NEW
|
|
UPDATE -> getRowExpression NEW
|
|
DELETE -> SENull
|
|
MANUAL -> SENull
|
|
|
|
name = triggerNameToTxt triggerName
|
|
qualifiedTriggerName = unQualifiedTriggerName dbQualifiedTriggerName
|
|
schemaName = pgFmtLit $ getSchemaTxt schema
|
|
tableName = pgFmtLit $ getTableTxt table
|
|
|
|
oldRow = toSQLTxt $ renderRow OLD
|
|
newRow = toSQLTxt $ renderRow NEW
|
|
oldPayloadExpression = toSQLTxt oldDataExp
|
|
newPayloadExpression = toSQLTxt newDataExp
|
|
in $(makeRelativeToProject "src-rsr/trigger.sql.shakespeare" >>= ST.stextFile)
|
|
pure dbQualifiedTriggerName
|
|
where
|
|
applyRowToJson' e = SEFnApp "row_to_json" [e] Nothing
|
|
applyRow e = SEFnApp "row" [e] Nothing
|
|
opToQual = QualVar . tshow
|
|
|
|
mkRowExpression opVar strfyNum columns =
|
|
mkRowExp $ map (\col -> toExtractor (mkQId opVar strfyNum col) col) columns
|
|
|
|
mkQId opVar strfyNum colInfo =
|
|
toJSONableExp strfyNum (ciType colInfo) False Nothing $
|
|
SEQIdentifier $
|
|
QIdentifier (opToQual opVar) $
|
|
toIdentifier $
|
|
ciColumn colInfo
|
|
|
|
-- Generate the SQL expression
|
|
toExtractor sqlExp column
|
|
-- If the column type is either 'Geography' or 'Geometry', then after applying the 'ST_AsGeoJSON' function
|
|
-- to the column, alias the value of the expression with the column name else it uses `st_asgeojson` as
|
|
-- the column name.
|
|
| isScalarColumnWhere isGeoType (ciType column) = Extractor sqlExp (Just $ getAlias column)
|
|
| otherwise = Extractor sqlExp Nothing
|
|
getAlias col = toColumnAlias $ Identifier $ getPGColTxt (ciColumn col)
|
|
|
|
checkIfTriggerExistsForTableQ ::
|
|
QualifiedTriggerName ->
|
|
QualifiedTable ->
|
|
PG.TxE QErr Bool
|
|
checkIfTriggerExistsForTableQ (QualifiedTriggerName triggerName) (QualifiedObject schemaName tableName) =
|
|
fmap (runIdentity . PG.getRow) $
|
|
PG.withQE
|
|
defaultTxErrorHandler
|
|
-- 'regclass' converts non-quoted strings to lowercase but since identifiers
|
|
-- such as table name needs are case-sensitive, we add quotes to table name
|
|
-- using 'quote_ident'.
|
|
-- Ref: https://www.postgresql.org/message-id/3896142.1620136761%40sss.pgh.pa.us
|
|
[PG.sql|
|
|
SELECT EXISTS (
|
|
SELECT 1
|
|
FROM pg_trigger
|
|
WHERE NOT tgisinternal
|
|
AND tgname = $1 AND tgrelid = (quote_ident($2) || '.' || quote_ident($3))::regclass
|
|
)
|
|
|]
|
|
(triggerName, schemaName, tableName)
|
|
True
|
|
|
|
checkIfFunctionExistsQ ::
|
|
TriggerName ->
|
|
Ops ->
|
|
PG.TxE QErr Bool
|
|
checkIfFunctionExistsQ triggerName op = do
|
|
let qualifiedTriggerName = pgTriggerName op triggerName
|
|
fmap (runIdentity . PG.getRow) $
|
|
PG.withQE
|
|
defaultTxErrorHandler
|
|
[PG.sql|
|
|
SELECT EXISTS (
|
|
SELECT 1
|
|
FROM pg_catalog.pg_proc
|
|
JOIN pg_namespace ON pg_catalog.pg_proc.pronamespace = pg_namespace.oid
|
|
WHERE proname = $1
|
|
AND pg_namespace.nspname = 'hdb_catalog'
|
|
)
|
|
|]
|
|
(Identity qualifiedTriggerName)
|
|
True
|
|
|
|
mkTrigger ::
|
|
forall pgKind m.
|
|
(Backend ('Postgres pgKind), MonadTx m, MonadReader ServerConfigCtx m) =>
|
|
TriggerName ->
|
|
QualifiedTable ->
|
|
[ColumnInfo ('Postgres pgKind)] ->
|
|
Ops ->
|
|
SubscribeOpSpec ('Postgres pgKind) ->
|
|
m ()
|
|
mkTrigger triggerName table allCols op subOpSpec = do
|
|
-- create/replace the trigger function
|
|
dbTriggerName <- mkTriggerFunctionQ triggerName table allCols op subOpSpec
|
|
-- check if the SQL trigger exists and only if the SQL trigger doesn't exist
|
|
-- we create the SQL trigger.
|
|
doesTriggerExist <- liftTx $ checkIfTriggerExistsForTableQ (pgTriggerName op triggerName) table
|
|
unless doesTriggerExist $
|
|
let sqlQuery =
|
|
PG.fromText $ createTriggerSQL dbTriggerName (toSQLTxt table) (tshow op)
|
|
in liftTx $ PG.unitQE defaultTxErrorHandler sqlQuery () False
|
|
where
|
|
createTriggerSQL (QualifiedTriggerName triggerNameTxt) tableName opText =
|
|
[ST.st|
|
|
CREATE TRIGGER #{triggerNameTxt} AFTER #{opText} ON #{tableName} FOR EACH ROW EXECUTE PROCEDURE hdb_catalog.#{triggerNameTxt}()
|
|
|]
|
|
|
|
mkAllTriggersQ ::
|
|
forall pgKind m.
|
|
(Backend ('Postgres pgKind), MonadTx m, MonadReader ServerConfigCtx m) =>
|
|
TriggerName ->
|
|
QualifiedTable ->
|
|
[ColumnInfo ('Postgres pgKind)] ->
|
|
TriggerOpsDef ('Postgres pgKind) ->
|
|
m ()
|
|
mkAllTriggersQ triggerName table allCols fullspec = do
|
|
for_ (tdInsert fullspec) (mkTrigger triggerName table allCols INSERT)
|
|
for_ (tdUpdate fullspec) (mkTrigger triggerName table allCols UPDATE)
|
|
for_ (tdDelete fullspec) (mkTrigger triggerName table allCols DELETE)
|
|
|
|
-- | Add cleanup logs for given trigger names and cleanup configs. This will perform the following steps:
|
|
--
|
|
-- 1. Get last scheduled cleanup event and count.
|
|
-- 2. If count is less than 5, then add add more cleanup logs, else do nothing
|
|
addCleanupSchedules ::
|
|
(MonadIO m, MonadError QErr m) =>
|
|
PGSourceConfig ->
|
|
[(TriggerName, AutoTriggerLogCleanupConfig)] ->
|
|
m ()
|
|
addCleanupSchedules sourceConfig triggersWithcleanupConfig =
|
|
unless (null triggersWithcleanupConfig) $ do
|
|
let triggerNames = map fst triggersWithcleanupConfig
|
|
countAndLastSchedules <- liftEitherM $ liftIO $ runPgSourceReadTx sourceConfig $ selectLastCleanupScheduledTimestamp triggerNames
|
|
currTime <- liftIO $ Time.getCurrentTime
|
|
let triggerMap = Map.fromList $ map (\(triggerName, count, lastTime) -> (triggerName, (count, lastTime))) countAndLastSchedules
|
|
scheduledTriggersAndTimestamps =
|
|
mapMaybe
|
|
( \(triggerName, cleanupConfig) ->
|
|
let lastScheduledTime = case Map.lookup triggerName triggerMap of
|
|
Nothing -> Just currTime
|
|
Just (count, lastTime) -> if count < 5 then (Just lastTime) else Nothing
|
|
in fmap
|
|
( \lastScheduledTimestamp ->
|
|
(triggerName, generateScheduleTimes lastScheduledTimestamp cleanupSchedulesToBeGenerated (_atlccSchedule cleanupConfig))
|
|
)
|
|
lastScheduledTime
|
|
)
|
|
triggersWithcleanupConfig
|
|
unless (null scheduledTriggersAndTimestamps) $
|
|
liftEitherM $
|
|
liftIO $
|
|
runPgSourceWriteTx sourceConfig $
|
|
insertEventTriggerCleanupLogsTx scheduledTriggersAndTimestamps
|
|
|
|
-- | Insert the cleanup logs for the fiven trigger name and schedules
|
|
insertEventTriggerCleanupLogsTx :: [(TriggerName, [Time.UTCTime])] -> PG.TxET QErr IO ()
|
|
insertEventTriggerCleanupLogsTx triggersWithschedules = do
|
|
let insertCleanupEventsSql =
|
|
TB.run $
|
|
toSQL
|
|
S.SQLInsert
|
|
{ siTable = cleanupLogTable,
|
|
siCols = map unsafePGCol ["trigger_name", "scheduled_at", "status"],
|
|
siValues = S.ValuesExp $ concatMap genArr triggersWithschedules,
|
|
siConflict = Just $ S.DoNothing Nothing,
|
|
siRet = Nothing
|
|
}
|
|
PG.unitQE defaultTxErrorHandler (PG.fromText insertCleanupEventsSql) () False
|
|
where
|
|
cleanupLogTable = QualifiedObject "hdb_catalog" "hdb_event_log_cleanups"
|
|
genArr (t, schedules) = map (toTupleExp . (\s -> [(triggerNameToTxt t), (formatTime' s), "scheduled"])) schedules
|
|
toTupleExp = S.TupleExp . map S.SELit
|
|
|
|
-- | Get the last scheduled timestamp for a given event trigger name
|
|
selectLastCleanupScheduledTimestamp :: [TriggerName] -> PG.TxET QErr IO [(TriggerName, Int, Time.UTCTime)]
|
|
selectLastCleanupScheduledTimestamp triggerNames =
|
|
PG.withQE
|
|
defaultTxErrorHandler
|
|
[PG.sql|
|
|
SELECT trigger_name, count(1), max(scheduled_at)
|
|
FROM hdb_catalog.hdb_event_log_cleanups
|
|
WHERE status='scheduled' AND trigger_name = ANY($1::text[])
|
|
GROUP BY trigger_name
|
|
|]
|
|
(Identity $ PGTextArray $ map triggerNameToTxt triggerNames)
|
|
True
|
|
|
|
deleteAllScheduledCleanupsTx :: TriggerName -> PG.TxE QErr ()
|
|
deleteAllScheduledCleanupsTx triggerName = do
|
|
PG.unitQE
|
|
defaultTxErrorHandler
|
|
[PG.sql|
|
|
DELETE from hdb_catalog.hdb_event_log_cleanups
|
|
WHERE (status = 'scheduled') AND (trigger_name = $1)
|
|
|]
|
|
(Identity (triggerNameToTxt triggerName))
|
|
True
|
|
|
|
-- | @deleteAllScheduledCleanups@ deletes all scheduled cleanup logs for a given event trigger
|
|
deleteAllScheduledCleanups ::
|
|
(MonadIO m, MonadError QErr m) =>
|
|
PGSourceConfig ->
|
|
TriggerName ->
|
|
m ()
|
|
deleteAllScheduledCleanups sourceConfig triggerName =
|
|
liftEitherM $ liftIO $ runPgSourceWriteTx sourceConfig $ deleteAllScheduledCleanupsTx triggerName
|
|
|
|
getCleanupEventsForDeletionTx :: PG.TxE QErr ([(Text, TriggerName)])
|
|
getCleanupEventsForDeletionTx =
|
|
PG.withQE
|
|
defaultTxErrorHandler
|
|
[PG.sql|
|
|
WITH latest_events as (
|
|
SELECT * from hdb_catalog.hdb_event_log_cleanups WHERE status = 'scheduled' AND scheduled_at < (now() at time zone 'utc')
|
|
),
|
|
grouped_events as (
|
|
SELECT trigger_name, max(scheduled_at) as scheduled_at
|
|
from latest_events
|
|
group by trigger_name
|
|
),
|
|
mark_events_as_dead as (
|
|
UPDATE hdb_catalog.hdb_event_log_cleanups l
|
|
SET status = 'dead'
|
|
FROM grouped_events AS g
|
|
WHERE l.trigger_name = g.trigger_name AND l.scheduled_at < g.scheduled_at AND l.status = 'scheduled'
|
|
)
|
|
SELECT l.id, l.trigger_name
|
|
FROM latest_events l
|
|
JOIN grouped_events g ON l.trigger_name = g.trigger_name
|
|
WHERE l.scheduled_at = g.scheduled_at;
|
|
|]
|
|
()
|
|
False
|
|
|
|
-- | @getCleanupEventsForDeletion@ returns the cleanup logs that are to be deleted.
|
|
-- This will perform the following steps:
|
|
--
|
|
-- 1. Get the scheduled cleanup events that were scheduled before current time.
|
|
-- 2. If there are multiple entries for the same trigger name with different scheduled time,
|
|
-- then fetch the latest entry and mark others as dead.
|
|
getCleanupEventsForDeletion ::
|
|
(MonadIO m, MonadError QErr m) =>
|
|
PGSourceConfig ->
|
|
m [(Text, TriggerName)]
|
|
getCleanupEventsForDeletion sourceConfig =
|
|
liftEitherM $ liftIO $ runPgSourceWriteTx sourceConfig $ getCleanupEventsForDeletionTx
|
|
|
|
markCleanupEventsAsDeadTx :: [Text] -> PG.TxE QErr ()
|
|
markCleanupEventsAsDeadTx toDeadEvents = do
|
|
unless (null toDeadEvents) $
|
|
PG.unitQE
|
|
defaultTxErrorHandler
|
|
[PG.sql|
|
|
UPDATE hdb_catalog.hdb_event_log_cleanups l
|
|
SET status = 'dead'
|
|
WHERE id = ANY($1::text[])
|
|
|]
|
|
(Identity $ PGTextArray toDeadEvents)
|
|
True
|
|
|
|
-- unitQueryE HGE.defaultMSSQLTxErrorHandler $
|
|
-- rawUnescapedText . LT.toStrict $
|
|
-- $(makeRelativeToProject "src-rsr/mssql/event_logs_cleanup_sqls/mssql_update_events_to_dead.sql.shakespeare" >>= ST.stextFile)
|
|
|
|
updateCleanupEventStatusToDead ::
|
|
(MonadIO m, MonadError QErr m) =>
|
|
PGSourceConfig ->
|
|
[Text] ->
|
|
m ()
|
|
updateCleanupEventStatusToDead sourceConfig toDeadEvents =
|
|
liftEitherM $ liftIO $ runPgSourceWriteTx sourceConfig $ markCleanupEventsAsDeadTx toDeadEvents
|
|
|
|
updateCleanupEventStatusToPausedTx :: Text -> PG.TxE QErr ()
|
|
updateCleanupEventStatusToPausedTx cleanupLogId =
|
|
PG.unitQE
|
|
defaultTxErrorHandler
|
|
[PG.sql|
|
|
UPDATE hdb_catalog.hdb_event_log_cleanups
|
|
SET status = 'paused'
|
|
WHERE id = $1
|
|
|]
|
|
(Identity cleanupLogId)
|
|
True
|
|
|
|
-- | @updateCleanupEventStatusToPaused@ updates the cleanup log status to `paused` if the event trigger configuration is paused.
|
|
updateCleanupEventStatusToPaused ::
|
|
(MonadIO m, MonadError QErr m) =>
|
|
PGSourceConfig ->
|
|
Text ->
|
|
m ()
|
|
updateCleanupEventStatusToPaused sourceConfig cleanupLogId =
|
|
liftEitherM $ liftIO $ runPgSourceWriteTx sourceConfig $ updateCleanupEventStatusToPausedTx cleanupLogId
|
|
|
|
updateCleanupEventStatusToCompletedTx :: Text -> DeletedEventLogStats -> PG.TxE QErr ()
|
|
updateCleanupEventStatusToCompletedTx cleanupLogId (DeletedEventLogStats numEventLogs numInvocationLogs) =
|
|
PG.unitQE
|
|
defaultTxErrorHandler
|
|
[PG.sql|
|
|
UPDATE hdb_catalog.hdb_event_log_cleanups
|
|
SET status = 'completed', deleted_event_logs = $2 , deleted_event_invocation_logs = $3
|
|
WHERE id = $1
|
|
|]
|
|
(cleanupLogId, delLogs, delInvLogs)
|
|
True
|
|
where
|
|
delLogs = (fromIntegral $ numEventLogs) :: Int64
|
|
delInvLogs = (fromIntegral $ numInvocationLogs) :: Int64
|
|
|
|
-- | @updateCleanupEventStatusToCompleted@ updates the cleanup log status after the event logs are deleted.
|
|
-- This will perform the following steps:
|
|
--
|
|
-- 1. Updates the cleanup config status to `completed`.
|
|
-- 2. Updates the number of event logs and event invocation logs that were deleted for a trigger name
|
|
updateCleanupEventStatusToCompleted ::
|
|
(MonadIO m, MonadError QErr m) =>
|
|
PGSourceConfig ->
|
|
Text ->
|
|
DeletedEventLogStats ->
|
|
m ()
|
|
updateCleanupEventStatusToCompleted sourceConfig cleanupLogId delStats =
|
|
liftEitherM $ liftIO $ runPgSourceWriteTx sourceConfig $ updateCleanupEventStatusToCompletedTx cleanupLogId delStats
|
|
|
|
deleteEventTriggerLogsTx :: TriggerLogCleanupConfig -> PG.TxE QErr DeletedEventLogStats
|
|
deleteEventTriggerLogsTx TriggerLogCleanupConfig {..} = do
|
|
-- Setting the timeout
|
|
PG.unitQE defaultTxErrorHandler (PG.fromText $ "SET statement_timeout = " <> (tshow qTimeout)) () True
|
|
-- Select all the dead events based on criteria set in the cleanup config.
|
|
deadEventIDs <-
|
|
map runIdentity
|
|
<$> PG.withQE
|
|
defaultTxErrorHandler
|
|
( PG.fromText
|
|
[ST.st|
|
|
SELECT id FROM hdb_catalog.event_log
|
|
WHERE ((delivered = true OR error = true) AND trigger_name = $1)
|
|
AND created_at < now() - interval '#{qRetentionPeriod}'
|
|
AND locked IS NULL
|
|
LIMIT $2
|
|
|]
|
|
)
|
|
(qTriggerName, qBatchSize)
|
|
True
|
|
-- Lock the events in the database so that other HGE instances don't pick them up for deletion.
|
|
PG.unitQE
|
|
defaultTxErrorHandler
|
|
[PG.sql|
|
|
UPDATE hdb_catalog.event_log
|
|
SET locked = now()
|
|
WHERE id = ANY($1::text[]);
|
|
|]
|
|
(Identity $ PGTextArray $ map unEventId deadEventIDs)
|
|
True
|
|
-- Based on the config either delete the corresponding invocation logs or set trigger_name
|
|
-- to appropriate value. Please note that the event_id won't exist anymore in the event_log
|
|
-- table, but we are still retaining it for debugging purpose.
|
|
deletedInvocationLogs <-
|
|
if tlccCleanInvocationLogs
|
|
then
|
|
runIdentity . PG.getRow
|
|
<$> PG.withQE
|
|
defaultTxErrorHandler
|
|
[PG.sql|
|
|
WITH deletedInvocations AS (
|
|
DELETE FROM hdb_catalog.event_invocation_logs
|
|
WHERE event_id = ANY($1::text[])
|
|
RETURNING 1
|
|
)
|
|
SELECT count(*) FROM deletedInvocations;
|
|
|]
|
|
(Identity $ PGTextArray $ map unEventId deadEventIDs)
|
|
True
|
|
else do
|
|
PG.unitQE
|
|
defaultTxErrorHandler
|
|
[PG.sql|
|
|
UPDATE hdb_catalog.event_invocation_logs
|
|
SET trigger_name = $2
|
|
WHERE event_id = ANY($1::text[])
|
|
|]
|
|
(PGTextArray $ map unEventId deadEventIDs, qTriggerName)
|
|
True
|
|
pure 0
|
|
-- Finally delete the event logs.
|
|
deletedEventLogs <-
|
|
runIdentity . PG.getRow
|
|
<$> PG.withQE
|
|
defaultTxErrorHandler
|
|
[PG.sql|
|
|
WITH deletedEvents AS (
|
|
DELETE FROM hdb_catalog.event_log
|
|
WHERE id = ANY($1::text[])
|
|
RETURNING 1
|
|
)
|
|
SELECT count(*) FROM deletedEvents;
|
|
|]
|
|
(Identity $ PGTextArray $ map unEventId deadEventIDs)
|
|
True
|
|
-- Resetting the timeout to default value (0)
|
|
PG.unitQE
|
|
defaultTxErrorHandler
|
|
[PG.sql|
|
|
SET statement_timeout = 0;
|
|
|]
|
|
()
|
|
False
|
|
pure DeletedEventLogStats {..}
|
|
where
|
|
qTimeout = (fromIntegral $ tlccTimeout * 1000) :: Int64
|
|
qTriggerName = triggerNameToTxt tlccEventTriggerName
|
|
qRetentionPeriod = tshow tlccClearOlderThan <> " hours"
|
|
qBatchSize = (fromIntegral tlccBatchSize) :: Int64
|
|
|
|
-- | @deleteEventTriggerLogs@ deletes the event logs (and event invocation logs) based on the cleanup configuration given
|
|
-- This will perform the following steps:
|
|
--
|
|
-- 1. Select all the dead events based on criteria set in the cleanup config.
|
|
-- 2. Lock the events in the database so that other HGE instances don't pick them up for deletion.
|
|
-- 3. Based on the config, perform the delete action.
|
|
deleteEventTriggerLogs ::
|
|
(MonadIO m, MonadError QErr m) =>
|
|
PGSourceConfig ->
|
|
TriggerLogCleanupConfig ->
|
|
IO (Maybe (TriggerLogCleanupConfig, EventTriggerCleanupStatus)) ->
|
|
m DeletedEventLogStats
|
|
deleteEventTriggerLogs sourceConfig oldCleanupConfig getLatestCleanupConfig = do
|
|
deleteEventTriggerLogsInBatchesWith getLatestCleanupConfig oldCleanupConfig $ \cleanupConfig -> do
|
|
runPgSourceWriteTx sourceConfig $ deleteEventTriggerLogsTx cleanupConfig
|