server: provide an option to enable event triggers on logically replicated tables

## Description ✍️
This PR introduces a new feature to enable/disable event triggers during logical replication of table data for PostgreSQL and MS-SQL data sources. We introduce a new field `trigger_on_replication` in the `*_create_event_trigger` metadata API. By default the event triggers will not fire for logical data replication.

## Changelog ✍️

__Component__ : server

__Type__: feature

__Product__: community-edition

### Short Changelog

Add option to enable/disable event triggers on logically replicated tables

### Related Issues ✍

https://github.com/hasura/graphql-engine/issues/8814
https://hasurahq.atlassian.net/browse/GS-252

### Solution and Design
- By default, triggers do **not** fire when the session mode is `replica` in Postgres, so if the `triggerOnReplication` is set to `true` for an event trigger we run the query `ALTER TABLE #{tableTxt} ENABLE ALWAYS TRIGGER #{triggerNameTxt};` so that the trigger fires always irrespective of the `session_replication_role`
- By default, triggers do fire in case of replication in MS-SQL, so if the `triggerOnReplication` is set to `false` for an event trigger we add a clause `NOT FOR REPLICATION` to the the SQL when the trigger is created/altered, which sets the `is_not_for_replication` for the trigger as `true` and it does not fire during logical replication.

### Steps to test and verify ✍
- Run hspec integration tests for HGE

## Server checklist ✍

### Metadata ✍

Does this PR add a new Metadata feature?
-  Yes
  - Does `export_metadata`/`replace_metadata` supports the new metadata added?
    - 

PR-URL: https://github.com/hasura/graphql-engine-mono/pull/6953
Co-authored-by: Puru Gupta <32328846+purugupta99@users.noreply.github.com>
Co-authored-by: Sean Park-Ross <94021366+seanparkross@users.noreply.github.com>
GitOrigin-RevId: 92731328a2bbdcad2302c829f26f9acb33c36135
This commit is contained in:
Karthikeyan Chinnakonda 2022-11-29 23:11:41 +05:30 committed by hasura-bot
parent 7184c46a4e
commit 32a316aef7
25 changed files with 673 additions and 125 deletions

View File

@ -97,9 +97,17 @@ X-Hasura-Role: admin
| request_transform | false | [RequestTransformation](/api-reference/syntax-defs.mdx#requesttransformation) | Attaches a Request Transformation to the Event Trigger. |
| response_transform | false | [ResponseTransformation](/api-reference/syntax-defs.mdx#responsetransformation) | Attaches a Request Transformation to the Event Trigger. |
| cleanup_config | false | [AutoEventTriggerCleanupConfig](/api-reference/syntax-defs.mdx#autoeventtriggercleanupconfig) | Cleanup config for the auto cleanup process (EE/Cloud only). |
| trigger_on_replication | false | Boolean | Specification for enabling/disabling the Event Trigger during logical replication |
(\*) Either `webhook` or `webhook_from_env` are required.
:::info Note
The default value of the `trigger_on_replication` parameter for Postgres sources will be `false`, which means that the
trigger will not fire during logical replication of data.
:::
## pg_delete_event_trigger {#metadata-pg-delete-event-trigger}
`pg_delete_event_trigger` is used to delete an event trigger.
@ -257,9 +265,17 @@ X-Hasura-Role: admin
| request_transform | false | [RequestTransformation](/api-reference/syntax-defs.mdx#requesttransformation) | Attaches a Request Transformation to the Event Trigger. |
| response_transform | false | [ResponseTransformation](/api-reference/syntax-defs.mdx#responsetransformation) | Attaches a Request Transformation to the Event Trigger. |
| cleanup_config | false | [AutoEventTriggerCleanupConfig](/api-reference/syntax-defs.mdx#autoeventtriggercleanupconfig) | Cleanup config for the auto cleanup process (EE/Cloud only). |
| trigger_on_replication | false | Boolean | Specification for enabling/disabling the Event Trigger during logical replication |
(\*) Either `webhook` or `webhook_from_env` are required.
:::info Note
The default value of the `trigger_on_replication` parameter for MSSQL sources will be `true`, which means that the
trigger will be fired during logical replication of data.
:::
## mssql_delete_event_trigger {#metadata-mssql-delete-event-trigger}
`mssql_delete_event_trigger` is used to delete an event trigger.
@ -406,7 +422,7 @@ X-Hasura-Role: admin
## pause_event_trigger_cleanups {#metadata-pause-event-trigger-cleanups}
<div className='badge badge--primary heading-badge'>Available on: Enterprise Edition/Cloud</div>
-
`pause_event_trigger_cleanups` is used to pause the log cleaner for event
triggers which already have a cleaner installed.

View File

@ -88,15 +88,17 @@ library
Test.DataConnector.MockAgent.TransformedConfigurationSpec
Test.DataConnector.QuerySpec
Test.DataConnector.SelectPermissionsSpec
Test.EventTriggers.MSSQL.EventTiggersUniqueNameSpec
Test.EventTriggers.MSSQL.EventTriggersUniqueNameSpec
Test.EventTriggers.MSSQL.EventTriggerDropSourceCleanupSpec
Test.EventTriggers.MSSQL.EventTriggersUntrackTableCleanupSpec
Test.EventTriggers.MSSQL.EventTriggersForReplicationSpec
Test.EventTriggers.PG.EventTriggersExtensionSchemaSpec
Test.EventTriggers.PG.EventTriggersRecreationSpec
Test.EventTriggers.PG.EventTriggersReplaceMetadataCleanupSpec
Test.EventTriggers.PG.EventTriggersRunSQLSpec
Test.EventTriggers.PG.EventTriggersUniqueNameSpec
Test.EventTriggers.PG.EventTriggersUntrackTableCleanupSpec
Test.EventTriggers.PG.EventTriggersForReplicationSpec
Test.Harness.Quoter.YamlSpec
Test.HelloWorldSpec
Test.Mutations.Delete.AllSpec

View File

@ -0,0 +1,195 @@
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE ViewPatterns #-}
-- | Test that event triggers are enabled/disabled when logical replication is used
module Test.EventTriggers.MSSQL.EventTriggersForReplicationSpec (spec) where
import Data.Aeson (Value (..))
import Data.List.NonEmpty qualified as NE
import Harness.Backend.Sqlserver qualified as Sqlserver
import Harness.GraphqlEngine qualified as GraphqlEngine
import Harness.Quoter.Yaml
import Harness.Test.Fixture qualified as Fixture
import Harness.Test.Schema (Table (..), table)
import Harness.Test.Schema qualified as Schema
import Harness.TestEnvironment (TestEnvironment)
import Harness.Webhook qualified as Webhook
import Harness.Yaml (shouldReturnYaml)
import Hasura.Prelude
import Test.Hspec
--------------------------------------------------------------------------------
-- Preamble
spec :: SpecWith TestEnvironment
spec =
Fixture.runWithLocalTestEnvironment
( NE.fromList
[ (Fixture.fixture $ Fixture.Backend Fixture.SQLServer)
{ -- setup the webhook server as the local test environment,
-- so that the server can be referenced while testing
Fixture.mkLocalTestEnvironment = const Webhook.run,
Fixture.setupTeardown = \(testEnvironment, (webhookServer, _)) ->
[ Sqlserver.setupTablesAction (schema "authors" "articles") testEnvironment,
Fixture.SetupAction
{ Fixture.setupAction = pure (),
Fixture.teardownAction = \_ -> mssqlTeardown testEnvironment
}
]
}
]
)
tests
--------------------------------------------------------------------------------
-- * Backend
-- ** Schema
schema :: Text -> Text -> [Schema.Table]
schema authorTableName articleTableName = [authorsTable authorTableName, articlesTable articleTableName]
authorsTable :: Text -> Schema.Table
authorsTable tableName =
(table tableName)
{ tableColumns =
[ Schema.column "id" Schema.TInt,
Schema.column "name" Schema.TStr
],
tablePrimaryKey = ["id"],
tableData =
[ [Schema.VInt 1, Schema.VStr "Author 1"],
[Schema.VInt 2, Schema.VStr "Author 2"]
]
}
articlesTable :: Text -> Schema.Table
articlesTable tableName =
(table tableName)
{ tableColumns =
[ Schema.column "id" Schema.TInt,
Schema.column "name" Schema.TStr
],
tablePrimaryKey = ["id"],
tableData =
[ [Schema.VInt 1, Schema.VStr "Article 1"],
[Schema.VInt 2, Schema.VStr "Article 2"]
]
}
--------------------------------------------------------------------------------
-- Tests
tests :: Fixture.Options -> SpecWith (TestEnvironment, (GraphqlEngine.Server, Webhook.EventsQueue))
tests opts = do
setTriggerForReplication opts
setTriggerForReplication :: Fixture.Options -> SpecWith (TestEnvironment, (GraphqlEngine.Server, Webhook.EventsQueue))
setTriggerForReplication opts =
describe "verify trigger status when logical replication is used" do
it "verify trigger is enabled on logical replication" $
\(testEnvironment, (webhookServer, (Webhook.EventsQueue eventsQueue))) -> do
mssqlSetupWithEventTriggers testEnvironment webhookServer "True"
let getTriggerInfoQuery =
[interpolateYaml|
type: mssql_run_sql
args:
source: mssql
sql: "SELECT name, is_not_for_replication FROM sys.triggers WHERE type='TR' ORDER BY name ASC;"
|]
expectedResponseForEnablingTriggers =
[yaml|
result_type: TuplesOk
result:
-
- name
- is_not_for_replication
-
- notify_hasura_author_trigger_DELETE
- False
-
- notify_hasura_author_trigger_INSERT
- False
-
- notify_hasura_author_trigger_UPDATE
- False
|]
shouldReturnYaml
opts
(GraphqlEngine.postV2Query 200 testEnvironment getTriggerInfoQuery)
expectedResponseForEnablingTriggers
it "verify trigger is disabled on logical replication" $
\(testEnvironment, (webhookServer, (Webhook.EventsQueue eventsQueue))) -> do
mssqlSetupWithEventTriggers testEnvironment webhookServer "False"
let getTriggerInfoQuery =
[interpolateYaml|
type: mssql_run_sql
args:
source: mssql
sql: "SELECT name, is_not_for_replication FROM sys.triggers WHERE type='TR' ORDER BY name ASC;"
|]
expectedResponseForDisablingTriggers =
[yaml|
result_type: TuplesOk
result:
-
- name
- is_not_for_replication
-
- notify_hasura_author_trigger_DELETE
- True
-
- notify_hasura_author_trigger_INSERT
- True
-
- notify_hasura_author_trigger_UPDATE
- True
|]
shouldReturnYaml
opts
(GraphqlEngine.postV2Query 200 testEnvironment getTriggerInfoQuery)
expectedResponseForDisablingTriggers
--------------------------------------------------------------------------------
-- ** Setup and teardown override
mssqlSetupWithEventTriggers :: TestEnvironment -> GraphqlEngine.Server -> Text -> IO ()
mssqlSetupWithEventTriggers testEnvironment webhookServer triggerOnReplication = do
let schemaName :: Schema.SchemaName
schemaName = Schema.getSchemaName testEnvironment
webhookServerEchoEndpoint = GraphqlEngine.serverUrl webhookServer ++ "/echo"
GraphqlEngine.postMetadata_ testEnvironment $
[interpolateYaml|
type: bulk
args:
- type: mssql_create_event_trigger
args:
name: author_trigger
source: mssql
table:
name: authors
schema: #{schemaName}
webhook: #{webhookServerEchoEndpoint}
trigger_on_replication: #{triggerOnReplication}
delete:
columns: "*"
insert:
columns: "*"
update:
columns: "*"
|]
mssqlTeardown :: TestEnvironment -> IO ()
mssqlTeardown testEnvironment = do
GraphqlEngine.postMetadata_ testEnvironment $
[yaml|
type: mssql_delete_event_trigger
args:
name: author_trigger
source: mssql
|]

View File

@ -2,7 +2,7 @@
{-# LANGUAGE ViewPatterns #-}
-- | Test that only event triggers with unique names are allowed
module Test.EventTriggers.MSSQL.EventTiggersUniqueNameSpec (spec) where
module Test.EventTriggers.MSSQL.EventTriggersUniqueNameSpec (spec) where
import Control.Concurrent.Chan qualified as Chan
import Data.Aeson (Value (..))

View File

@ -0,0 +1,197 @@
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE ViewPatterns #-}
-- | Test that event triggers are enabled/disabled when logical replication is used
module Test.EventTriggers.PG.EventTriggersForReplicationSpec (spec) where
import Data.Aeson (Value (..))
import Data.List.NonEmpty qualified as NE
import Harness.Backend.Postgres qualified as Postgres
import Harness.GraphqlEngine qualified as GraphqlEngine
import Harness.Quoter.Yaml
import Harness.Test.Fixture qualified as Fixture
import Harness.Test.Schema (Table (..), table)
import Harness.Test.Schema qualified as Schema
import Harness.TestEnvironment (TestEnvironment)
import Harness.Webhook qualified as Webhook
import Harness.Yaml (shouldReturnYaml)
import Hasura.Prelude
import Test.Hspec
--------------------------------------------------------------------------------
-- Preamble
spec :: SpecWith TestEnvironment
spec =
Fixture.runWithLocalTestEnvironment
( NE.fromList
[ (Fixture.fixture $ Fixture.Backend Fixture.Postgres)
{ -- setup the webhook server as the local test environment,
-- so that the server can be referenced while testing
Fixture.mkLocalTestEnvironment = const Webhook.run,
Fixture.setupTeardown = \(testEnvironment, (webhookServer, _)) ->
[ Postgres.setupTablesAction (schema "authors" "articles") testEnvironment,
Fixture.SetupAction
{ Fixture.setupAction = pure (),
Fixture.teardownAction = \_ -> postgresTeardown testEnvironment
}
]
}
]
)
tests
--------------------------------------------------------------------------------
-- * Backend
-- ** Schema
schema :: Text -> Text -> [Schema.Table]
schema authorTableName articleTableName = [authorsTable authorTableName, articlesTable articleTableName]
authorsTable :: Text -> Schema.Table
authorsTable tableName =
(table tableName)
{ tableColumns =
[ Schema.column "id" Schema.TInt,
Schema.column "name" Schema.TStr
],
tablePrimaryKey = ["id"],
tableData =
[ [Schema.VInt 1, Schema.VStr "Author 1"],
[Schema.VInt 2, Schema.VStr "Author 2"]
]
}
articlesTable :: Text -> Schema.Table
articlesTable tableName =
(table tableName)
{ tableColumns =
[ Schema.column "id" Schema.TInt,
Schema.column "name" Schema.TStr
],
tablePrimaryKey = ["id"],
tableData =
[ [Schema.VInt 1, Schema.VStr "Article 1"],
[Schema.VInt 2, Schema.VStr "Article 2"]
]
}
--------------------------------------------------------------------------------
-- Tests
tests :: Fixture.Options -> SpecWith (TestEnvironment, (GraphqlEngine.Server, Webhook.EventsQueue))
tests opts = do
setTriggerForReplication opts
setTriggerForReplication :: Fixture.Options -> SpecWith (TestEnvironment, (GraphqlEngine.Server, Webhook.EventsQueue))
setTriggerForReplication opts =
describe "verify trigger status when logical replication is used" do
it "verify trigger is enabled on logical replication" $
\(testEnvironment, (webhookServer, (Webhook.EventsQueue eventsQueue))) -> do
postgresSetupWithEventTriggers testEnvironment webhookServer "True"
let getTriggerInfoQuery =
[interpolateYaml|
type: run_sql
args:
source: postgres
sql: "SELECT tgname, tgenabled FROM pg_trigger WHERE tgrelid = 'authors'::regclass ORDER BY tgname ASC;"
|]
-- tgenabled: `A` specifies that trigger will always fire, that is, in all modes
-- origin, local and replica
expectedResponseForEnablingTriggers =
[yaml|
result_type: TuplesOk
result:
-
- tgname
- tgenabled
-
- notify_hasura_author_trigger_DELETE
- A
-
- notify_hasura_author_trigger_INSERT
- A
-
- notify_hasura_author_trigger_UPDATE
- A
|]
shouldReturnYaml
opts
(GraphqlEngine.postV2Query 200 testEnvironment getTriggerInfoQuery)
expectedResponseForEnablingTriggers
it "verify trigger is disabled on logical replication" $
\(testEnvironment, (webhookServer, (Webhook.EventsQueue eventsQueue))) -> do
postgresSetupWithEventTriggers testEnvironment webhookServer "False"
let getTriggerInfoQuery =
[interpolateYaml|
type: run_sql
args:
source: postgres
sql: "SELECT tgname, tgenabled FROM pg_trigger WHERE tgrelid = 'authors'::regclass ORDER BY tgname ASC;"
|]
-- tgenabled: `O` specifies that trigger will fire in only origin &
-- local modes, not replica mode
expectedResponseForDisablingTriggers =
[yaml|
result_type: TuplesOk
result:
-
- tgname
- tgenabled
-
- notify_hasura_author_trigger_DELETE
- O
-
- notify_hasura_author_trigger_INSERT
- O
-
- notify_hasura_author_trigger_UPDATE
- O
|]
shouldReturnYaml
opts
(GraphqlEngine.postV2Query 200 testEnvironment getTriggerInfoQuery)
expectedResponseForDisablingTriggers
--------------------------------------------------------------------------------
-- ** Setup and teardown override
postgresSetupWithEventTriggers :: TestEnvironment -> GraphqlEngine.Server -> Text -> IO ()
postgresSetupWithEventTriggers testEnvironment webhookServer triggerOnReplication = do
let schemaName :: Schema.SchemaName
schemaName = Schema.getSchemaName testEnvironment
webhookServerEchoEndpoint = GraphqlEngine.serverUrl webhookServer ++ "/echo"
GraphqlEngine.postMetadata_ testEnvironment $
[interpolateYaml|
type: pg_create_event_trigger
args:
name: author_trigger
source: postgres
table:
name: authors
schema: #{schemaName}
webhook: #{webhookServerEchoEndpoint}
trigger_on_replication: #{triggerOnReplication}
delete:
columns: "*"
insert:
columns: "*"
update:
columns: "*"
|]
postgresTeardown :: TestEnvironment -> IO ()
postgresTeardown testEnvironment = do
GraphqlEngine.postMetadata_ testEnvironment $
[yaml|
type: pg_delete_event_trigger
args:
name: author_trigger
source: postgres
|]

View File

@ -112,3 +112,5 @@ instance Backend 'BigQuery where
resizeSourcePools _sourceConfig _serverReplicas =
-- BigQuery does not posses connection pooling
pure ()
defaultTriggerOnReplication = error "Event triggers are not supported for the BigQuery source."

View File

@ -154,6 +154,8 @@ instance Backend 'DataConnector where
-- Data connectors do not have concept of connection pools
pure ()
defaultTriggerOnReplication = error "Event triggers is not implemented for the data connector backend."
data CustomBooleanOperator a = CustomBooleanOperator
{ _cboName :: Text,
_cboRHS :: Maybe (Either (RootOrCurrentColumn 'DataConnector) a) -- TODO turn Either into a specific type

View File

@ -216,13 +216,14 @@ createTableEventTrigger ::
TableName ->
[ColumnInfo 'MSSQL] ->
TriggerName ->
TriggerOnReplication ->
TriggerOpsDef 'MSSQL ->
Maybe (PrimaryKey 'MSSQL (ColumnInfo 'MSSQL)) ->
m (Either QErr ())
createTableEventTrigger _serverConfigCtx sourceConfig table columns triggerName opsDefinition primaryKeyMaybe = do
createTableEventTrigger _serverConfigCtx sourceConfig table columns triggerName triggerOnReplication opsDefinition primaryKeyMaybe = do
liftIO $
runMSSQLSourceWriteTx sourceConfig $ do
mkAllTriggersQ triggerName table columns opsDefinition primaryKeyMaybe
mkAllTriggersQ triggerName table triggerOnReplication columns opsDefinition primaryKeyMaybe
createMissingSQLTriggers ::
( MonadIO m,
@ -233,22 +234,29 @@ createMissingSQLTriggers ::
TableName ->
([ColumnInfo 'MSSQL], Maybe (PrimaryKey 'MSSQL (ColumnInfo 'MSSQL))) ->
TriggerName ->
TriggerOnReplication ->
TriggerOpsDef 'MSSQL ->
m ()
createMissingSQLTriggers sourceConfig table@(TableName tableNameText (SchemaName schemaText)) (allCols, primaryKeyMaybe) triggerName opsDefinition = do
liftEitherM $
runMSSQLSourceWriteTx sourceConfig $ do
for_ (tdInsert opsDefinition) (doesSQLTriggerExist INSERT)
for_ (tdUpdate opsDefinition) (doesSQLTriggerExist UPDATE)
for_ (tdDelete opsDefinition) (doesSQLTriggerExist DELETE)
where
doesSQLTriggerExist op opSpec = do
let triggerNameWithOp = "notify_hasura_" <> triggerNameToTxt triggerName <> "_" <> tshow op
doesOpTriggerExist <-
liftMSSQLTx $
singleRowQueryE
HGE.defaultMSSQLTxErrorHandler
[ODBC.sql|
createMissingSQLTriggers
sourceConfig
table@(TableName tableNameText (SchemaName schemaText))
(allCols, primaryKeyMaybe)
triggerName
triggerOnReplication
opsDefinition = do
liftEitherM $
runMSSQLSourceWriteTx sourceConfig $ do
for_ (tdInsert opsDefinition) (doesSQLTriggerExist INSERT)
for_ (tdUpdate opsDefinition) (doesSQLTriggerExist UPDATE)
for_ (tdDelete opsDefinition) (doesSQLTriggerExist DELETE)
where
doesSQLTriggerExist op opSpec = do
let triggerNameWithOp = "notify_hasura_" <> triggerNameToTxt triggerName <> "_" <> tshow op
doesOpTriggerExist <-
liftMSSQLTx $
singleRowQueryE
HGE.defaultMSSQLTxErrorHandler
[ODBC.sql|
SELECT CASE WHEN EXISTS
( SELECT 1
FROM sys.triggers tr
@ -260,12 +268,12 @@ createMissingSQLTriggers sourceConfig table@(TableName tableNameText (SchemaName
ELSE CAST(0 AS BIT)
END;
|]
unless doesOpTriggerExist $ do
case op of
INSERT -> mkInsertTriggerQ triggerName table allCols opSpec
UPDATE -> mkUpdateTriggerQ triggerName table allCols primaryKeyMaybe opSpec
DELETE -> mkDeleteTriggerQ triggerName table allCols opSpec
MANUAL -> pure ()
unless doesOpTriggerExist $ do
case op of
INSERT -> mkInsertTriggerQ triggerName table allCols triggerOnReplication opSpec
UPDATE -> mkUpdateTriggerQ triggerName table allCols triggerOnReplication primaryKeyMaybe opSpec
DELETE -> mkDeleteTriggerQ triggerName table allCols triggerOnReplication opSpec
MANUAL -> pure ()
unlockEventsInSource ::
MonadIO m =>
@ -517,9 +525,9 @@ checkEventTx eventId = do
HGE.defaultMSSQLTxErrorHandler
[ODBC.sql|
SELECT
CAST(CASE
CAST(CASE
WHEN (l.locked IS NOT NULL AND l.locked >= DATEADD(MINUTE, -30, SYSDATETIMEOFFSET())) THEN 1 ELSE 0
END
END
AS bit)
FROM hdb_catalog.event_log l
WHERE l.id = $eId
@ -633,14 +641,15 @@ mkAllTriggersQ ::
MonadMSSQLTx m =>
TriggerName ->
TableName ->
TriggerOnReplication ->
[ColumnInfo 'MSSQL] ->
TriggerOpsDef 'MSSQL ->
Maybe (PrimaryKey 'MSSQL (ColumnInfo 'MSSQL)) ->
m ()
mkAllTriggersQ triggerName tableName allCols fullSpec primaryKey = do
for_ (tdInsert fullSpec) (mkInsertTriggerQ triggerName tableName allCols)
for_ (tdDelete fullSpec) (mkDeleteTriggerQ triggerName tableName allCols)
for_ (tdUpdate fullSpec) (mkUpdateTriggerQ triggerName tableName allCols primaryKey)
mkAllTriggersQ triggerName tableName triggerOnReplication allCols fullSpec primaryKey = do
for_ (tdInsert fullSpec) (mkInsertTriggerQ triggerName tableName allCols triggerOnReplication)
for_ (tdDelete fullSpec) (mkDeleteTriggerQ triggerName tableName allCols triggerOnReplication)
for_ (tdUpdate fullSpec) (mkUpdateTriggerQ triggerName tableName allCols triggerOnReplication primaryKey)
getApplicableColumns :: [ColumnInfo 'MSSQL] -> SubscribeColumns 'MSSQL -> [ColumnInfo 'MSSQL]
getApplicableColumns allColumnInfos = \case
@ -670,40 +679,43 @@ mkInsertTriggerQ ::
TriggerName ->
TableName ->
[ColumnInfo 'MSSQL] ->
TriggerOnReplication ->
SubscribeOpSpec 'MSSQL ->
m ()
mkInsertTriggerQ triggerName table allCols subOpSpec@(SubscribeOpSpec _listenCols deliveryCols) = do
mkInsertTriggerQ triggerName table allCols triggerOnReplication subOpSpec@(SubscribeOpSpec _listenCols deliveryCols) = do
checkSpatialDataTypeColumns allCols subOpSpec
liftMSSQLTx $ do
unitQueryE HGE.defaultMSSQLTxErrorHandler $
rawUnescapedText . LT.toStrict $ do
let deliveryColumns = getApplicableColumns allCols $ fromMaybe SubCStar deliveryCols
mkInsertTriggerQuery table triggerName deliveryColumns
mkInsertTriggerQuery table triggerName deliveryColumns triggerOnReplication
mkDeleteTriggerQ ::
MonadMSSQLTx m =>
TriggerName ->
TableName ->
[ColumnInfo 'MSSQL] ->
TriggerOnReplication ->
SubscribeOpSpec 'MSSQL ->
m ()
mkDeleteTriggerQ triggerName table allCols subOpSpec@(SubscribeOpSpec _listenCols deliveryCols) = do
mkDeleteTriggerQ triggerName table allCols triggerOnReplication subOpSpec@(SubscribeOpSpec _listenCols deliveryCols) = do
checkSpatialDataTypeColumns allCols subOpSpec
liftMSSQLTx $ do
unitQueryE HGE.defaultMSSQLTxErrorHandler $
rawUnescapedText . LT.toStrict $ do
let deliveryColumns = getApplicableColumns allCols $ fromMaybe SubCStar deliveryCols
mkDeleteTriggerQuery table triggerName deliveryColumns
mkDeleteTriggerQuery table triggerName deliveryColumns triggerOnReplication
mkUpdateTriggerQ ::
MonadMSSQLTx m =>
TriggerName ->
TableName ->
[ColumnInfo 'MSSQL] ->
TriggerOnReplication ->
Maybe (PrimaryKey 'MSSQL (ColumnInfo 'MSSQL)) ->
SubscribeOpSpec 'MSSQL ->
m ()
mkUpdateTriggerQ triggerName table allCols primaryKeyMaybe subOpSpec@(SubscribeOpSpec listenCols deliveryCols) = do
mkUpdateTriggerQ triggerName table allCols triggerOnReplication primaryKeyMaybe subOpSpec@(SubscribeOpSpec listenCols deliveryCols) = do
checkSpatialDataTypeColumns allCols subOpSpec
liftMSSQLTx $ do
primaryKey <- onNothing primaryKeyMaybe (throw400 NotSupported "Update event triggers for MS-SQL sources are only supported on tables with primary keys")
@ -711,7 +723,7 @@ mkUpdateTriggerQ triggerName table allCols primaryKeyMaybe subOpSpec@(SubscribeO
listenColumns = getApplicableColumns allCols listenCols
unitQueryE HGE.defaultMSSQLTxErrorHandler $
rawUnescapedText . LT.toStrict $
mkUpdateTriggerQuery table triggerName listenColumns deliveryColumns primaryKey
mkUpdateTriggerQuery table triggerName listenColumns deliveryColumns primaryKey triggerOnReplication
-- Create alias for columns
-- eg: If colPrefixMaybe is defined then 'inserted.id as payload.data.old.id'
@ -747,22 +759,24 @@ generateColumnTriggerAlias op colPrefixMaybe colInfo =
qualifyTableName :: TableName -> Text
qualifyTableName = toTxt . toQueryFlat . fromTableName
mkInsertTriggerQuery :: TableName -> TriggerName -> [ColumnInfo 'MSSQL] -> LT.Text
mkInsertTriggerQuery table@(TableName tableName schema@(SchemaName schemaName)) triggerName columns =
mkInsertTriggerQuery :: TableName -> TriggerName -> [ColumnInfo 'MSSQL] -> TriggerOnReplication -> LT.Text
mkInsertTriggerQuery table@(TableName tableName schema@(SchemaName schemaName)) triggerName columns triggerOnReplication =
let QualifiedTriggerName qualifiedTriggerName = msssqlIdenTrigger INSERT schema triggerName
triggerNameText = triggerNameToTxt triggerName
qualifiedTableName = qualifyTableName table
operation = tshow INSERT
replicationClause :: String = if triggerOnReplication /= TOREnableTrigger then "NOT FOR REPLICATION" else ""
deliveryColsSQLExpression :: Text =
commaSeparated $ map (unSQLFragment . generateColumnTriggerAlias NEW Nothing) columns
in $(makeRelativeToProject "src-rsr/mssql/mssql_insert_trigger.sql.shakespeare" >>= ST.stextFile)
mkDeleteTriggerQuery :: TableName -> TriggerName -> [ColumnInfo 'MSSQL] -> LT.Text
mkDeleteTriggerQuery table@(TableName tableName schema@(SchemaName schemaName)) triggerName columns =
mkDeleteTriggerQuery :: TableName -> TriggerName -> [ColumnInfo 'MSSQL] -> TriggerOnReplication -> LT.Text
mkDeleteTriggerQuery table@(TableName tableName schema@(SchemaName schemaName)) triggerName columns triggerOnReplication =
let QualifiedTriggerName qualifiedTriggerName = msssqlIdenTrigger DELETE schema triggerName
triggerNameText = triggerNameToTxt triggerName
qualifiedTableName = qualifyTableName table
operation = tshow DELETE
replicationClause :: String = if triggerOnReplication /= TOREnableTrigger then "NOT FOR REPLICATION" else ""
deliveryColsSQLExpression :: Text = commaSeparated $ map (unSQLFragment . generateColumnTriggerAlias OLD Nothing) columns
in $(makeRelativeToProject "src-rsr/mssql/mssql_delete_trigger.sql.shakespeare" >>= ST.stextFile)
@ -841,17 +855,19 @@ The spec for MSSQL UPDATE Event Trigger is as follows:
b. If the updated primary key is not equal to any of the already present primary key
in the table then, 'data.old' is NULL and only 'data.new' is constructed.
-}
mkUpdateTriggerQuery :: TableName -> TriggerName -> [ColumnInfo 'MSSQL] -> [ColumnInfo 'MSSQL] -> PrimaryKey 'MSSQL (ColumnInfo 'MSSQL) -> LT.Text
mkUpdateTriggerQuery :: TableName -> TriggerName -> [ColumnInfo 'MSSQL] -> [ColumnInfo 'MSSQL] -> PrimaryKey 'MSSQL (ColumnInfo 'MSSQL) -> TriggerOnReplication -> LT.Text
mkUpdateTriggerQuery
table@(TableName tableName schema@(SchemaName schemaName))
triggerName
listenColumns
deliveryColumns
primaryKey =
primaryKey
triggerOnReplication =
let QualifiedTriggerName qualifiedTriggerName = msssqlIdenTrigger UPDATE schema triggerName
triggerNameText = triggerNameToTxt triggerName
qualifiedTableName = qualifyTableName table
operation = tshow UPDATE
replicationClause :: String = if triggerOnReplication /= TOREnableTrigger then "NOT FOR REPLICATION" else ""
oldDeliveryColsSQLExp :: Text = commaSeparated $ map (unSQLFragment . generateColumnTriggerAlias OLD (Just "DELETED")) deliveryColumns
newDeliveryColsSQLExp :: Text = commaSeparated $ map (unSQLFragment . generateColumnTriggerAlias NEW (Just "INSERTED")) deliveryColumns
@ -939,9 +955,9 @@ selectLastCleanupScheduledTimestamp triggerNames =
HGE.defaultMSSQLTxErrorHandler
( rawUnescapedText
[ST.st|
SELECT trigger_name, count(1), max(scheduled_at)
FROM hdb_catalog.hdb_event_log_cleanups
WHERE status='scheduled' AND trigger_name =
SELECT trigger_name, count(1), max(scheduled_at)
FROM hdb_catalog.hdb_event_log_cleanups
WHERE status='scheduled' AND trigger_name =
ANY(SELECT n from (VALUES #{triggerNamesValues}) AS X(n))
GROUP BY trigger_name;
|]
@ -993,7 +1009,7 @@ getCleanupEventsForDeletionTx = do
( rawUnescapedText
[ST.st|
SELECT CAST(id AS nvarchar(36)) FROM hdb_catalog.hdb_event_log_cleanups
WHERE status = 'scheduled' AND scheduled_at < CURRENT_TIMESTAMP AND id NOT IN
WHERE status = 'scheduled' AND scheduled_at < CURRENT_TIMESTAMP AND id NOT IN
(SELECT n from (VALUES #{cleanupIDsSQLValue}) AS X(n));
|]
)
@ -1106,7 +1122,7 @@ deleteEventTriggerLogsTx TriggerLogCleanupConfig {..} = do
[ST.st|
UPDATE hdb_catalog.event_log
SET locked = CURRENT_TIMESTAMP
WHERE id = ANY ( SELECT id from (VALUES #{eventIdsValues}) AS X(id))
WHERE id = ANY ( SELECT id from (VALUES #{eventIdsValues}) AS X(id))
AND locked IS NULL
|]
-- Based on the config either delete the corresponding invocation logs or set trigger_name

View File

@ -17,6 +17,7 @@ import Hasura.Backends.MSSQL.Types.Update qualified as MSSQL (BackendUpdate)
import Hasura.Base.Error
import Hasura.Prelude
import Hasura.RQL.Types.Backend
import Hasura.RQL.Types.Common (TriggerOnReplication (..))
import Hasura.RQL.Types.HealthCheck
import Hasura.RQL.Types.HealthCheckImplementation (HealthCheckImplementation (..))
import Hasura.RQL.Types.ResizePool (ServerReplicas)
@ -119,3 +120,5 @@ instance Backend 'MSSQL where
resizeSourcePools :: SourceConfig 'MSSQL -> ServerReplicas -> IO ()
resizeSourcePools sourceConfig =
MSSQL.mssqlResizePools (MSSQL._mscExecCtx sourceConfig)
defaultTriggerOnReplication = TOREnableTrigger

View File

@ -144,3 +144,5 @@ instance Backend 'MySQL where
Pool.resizePool pool (maxConnections `div` getServerReplicasInt serverReplicas)
-- Trim pool by destroying excess resources, if any
Pool.tryTrimPool pool
defaultTriggerOnReplication = error "Event triggers are not implemented for the MySQL source."

View File

@ -210,9 +210,10 @@ createMissingSQLTriggers ::
TableName ('Postgres pgKind) ->
([(ColumnInfo ('Postgres pgKind))], Maybe (PrimaryKey ('Postgres pgKind) (ColumnInfo ('Postgres pgKind)))) ->
TriggerName ->
TriggerOnReplication ->
TriggerOpsDef ('Postgres pgKind) ->
m ()
createMissingSQLTriggers sourceConfig table (allCols, _) triggerName opsDefinition = do
createMissingSQLTriggers sourceConfig table (allCols, _) triggerName triggerOnReplication opsDefinition = do
serverConfigCtx <- askServerConfigCtx
liftEitherM $
runPgSourceWriteTx sourceConfig $ do
@ -237,7 +238,7 @@ createMissingSQLTriggers sourceConfig table (allCols, _) triggerName opsDefiniti
True
unless doesOpTriggerFunctionExist $
flip runReaderT serverConfigCtx $
mkTrigger triggerName table allCols op opSpec
mkTrigger triggerName table triggerOnReplication allCols op opSpec
createTableEventTrigger ::
(Backend ('Postgres pgKind), MonadIO m, MonadBaseControl IO m) =>
@ -246,13 +247,14 @@ createTableEventTrigger ::
QualifiedTable ->
[ColumnInfo ('Postgres pgKind)] ->
TriggerName ->
TriggerOnReplication ->
TriggerOpsDef ('Postgres pgKind) ->
Maybe (PrimaryKey ('Postgres pgKind) (ColumnInfo ('Postgres pgKind))) ->
m (Either QErr ())
createTableEventTrigger serverConfigCtx sourceConfig table columns triggerName opsDefinition _ = runPgSourceWriteTx sourceConfig $ do
createTableEventTrigger serverConfigCtx sourceConfig table columns triggerName triggerOnReplication opsDefinition _ = runPgSourceWriteTx sourceConfig $ do
-- Create the given triggers
flip runReaderT serverConfigCtx $
mkAllTriggersQ triggerName table columns opsDefinition
mkAllTriggersQ triggerName table triggerOnReplication columns opsDefinition
dropDanglingSQLTrigger ::
( MonadIO m,
@ -796,38 +798,49 @@ mkTrigger ::
(Backend ('Postgres pgKind), MonadTx m, MonadReader ServerConfigCtx m) =>
TriggerName ->
QualifiedTable ->
TriggerOnReplication ->
[ColumnInfo ('Postgres pgKind)] ->
Ops ->
SubscribeOpSpec ('Postgres pgKind) ->
m ()
mkTrigger triggerName table allCols op subOpSpec = do
mkTrigger triggerName table triggerOnReplication allCols op subOpSpec = do
-- create/replace the trigger function
dbTriggerName <- mkTriggerFunctionQ triggerName table allCols op subOpSpec
QualifiedTriggerName dbTriggerNameTxt <- mkTriggerFunctionQ triggerName table allCols op subOpSpec
-- check if the SQL trigger exists and only if the SQL trigger doesn't exist
-- we create the SQL trigger.
doesTriggerExist <- liftTx $ checkIfTriggerExistsForTableQ (pgTriggerName op triggerName) table
unless doesTriggerExist $
let sqlQuery =
PG.fromText $ createTriggerSQL dbTriggerName (toSQLTxt table) (tshow op)
in liftTx $ PG.unitQE defaultTxErrorHandler sqlQuery () False
let createTriggerSqlQuery =
PG.fromText $ createTriggerSQL dbTriggerNameTxt (toSQLTxt table) (tshow op)
in liftTx $ do
PG.unitQE defaultTxErrorHandler createTriggerSqlQuery () False
when (triggerOnReplication == TOREnableTrigger) $
PG.unitQE defaultTxErrorHandler (alwaysEnableTriggerQuery dbTriggerNameTxt (toSQLTxt table)) () False
where
createTriggerSQL (QualifiedTriggerName triggerNameTxt) tableName opText =
createTriggerSQL triggerNameTxt tableName opText =
[ST.st|
CREATE TRIGGER #{triggerNameTxt} AFTER #{opText} ON #{tableName} FOR EACH ROW EXECUTE PROCEDURE hdb_catalog.#{triggerNameTxt}()
|]
alwaysEnableTriggerQuery triggerNameTxt tableTxt =
PG.fromText $
[ST.st|
ALTER TABLE #{tableTxt} ENABLE ALWAYS TRIGGER #{triggerNameTxt};
|]
mkAllTriggersQ ::
forall pgKind m.
(Backend ('Postgres pgKind), MonadTx m, MonadReader ServerConfigCtx m) =>
TriggerName ->
QualifiedTable ->
TriggerOnReplication ->
[ColumnInfo ('Postgres pgKind)] ->
TriggerOpsDef ('Postgres pgKind) ->
m ()
mkAllTriggersQ triggerName table allCols fullspec = do
for_ (tdInsert fullspec) (mkTrigger triggerName table allCols INSERT)
for_ (tdUpdate fullspec) (mkTrigger triggerName table allCols UPDATE)
for_ (tdDelete fullspec) (mkTrigger triggerName table allCols DELETE)
mkAllTriggersQ triggerName table triggerOnReplication allCols fullspec = do
for_ (tdInsert fullspec) (mkTrigger triggerName table triggerOnReplication allCols INSERT)
for_ (tdUpdate fullspec) (mkTrigger triggerName table triggerOnReplication allCols UPDATE)
for_ (tdDelete fullspec) (mkTrigger triggerName table triggerOnReplication allCols DELETE)
-- | Add cleanup logs for given trigger names and cleanup configs. This will perform the following steps:
--

View File

@ -263,9 +263,8 @@ withMetadataCheck source cascade txAccess runSQLQuery = do
forM_ (M.elems tables) $ \(TableInfo coreInfo _ eventTriggers _) -> do
let table = _tciName coreInfo
columns = getCols $ _tciFieldInfoMap coreInfo
forM_ (M.toList eventTriggers) $ \(triggerName, eti) -> do
let opsDefinition = etiOpsDef eti
flip runReaderT serverConfigCtx $ mkAllTriggersQ triggerName table columns opsDefinition
forM_ (M.toList eventTriggers) $ \(triggerName, EventTriggerInfo {etiOpsDef, etiTriggerOnReplication}) -> do
flip runReaderT serverConfigCtx $ mkAllTriggersQ triggerName table etiTriggerOnReplication columns etiOpsDef
-- | @'runTxWithMetadataCheck source sourceConfig txAccess tableCache functionCache cascadeDependencies tx' checks for
-- changes in GraphQL Engine metadata when a @'tx' is executed on the database alters Postgres

View File

@ -30,7 +30,7 @@ import Hasura.Base.Error
import Hasura.Prelude
import Hasura.RQL.IR.BoolExp.AggregationPredicates qualified as Agg
import Hasura.RQL.Types.Backend
import Hasura.RQL.Types.Common (SourceName)
import Hasura.RQL.Types.Common (SourceName, TriggerOnReplication (..))
import Hasura.RQL.Types.HealthCheck
import Hasura.RQL.Types.HealthCheckImplementation (HealthCheckImplementation (..))
import Hasura.SQL.Backend
@ -152,3 +152,5 @@ instance
namingConventionSupport = Postgres.namingConventionSupport
resizeSourcePools sourceConfig = Postgres._pecResizePools (Postgres._pscExecCtx sourceConfig)
defaultTriggerOnReplication = TORDisableTrigger

View File

@ -33,6 +33,7 @@ module Hasura.RQL.DDL.EventTrigger
cetqRequestTransform,
cetqResponseTrasnform,
cteqCleanupConfig,
cteqTriggerOnReplication,
runCleanupEventTriggerLog,
runEventTriggerResumeCleanup,
runEventTriggerPauseCleanup,
@ -97,7 +98,8 @@ data CreateEventTriggerQuery (b :: BackendType) = CreateEventTriggerQuery
_cetqReplace :: Bool,
_cetqRequestTransform :: Maybe RequestTransform,
_cetqResponseTrasnform :: Maybe MetadataResponseTransform,
_cteqCleanupConfig :: Maybe AutoTriggerLogCleanupConfig
_cteqCleanupConfig :: Maybe AutoTriggerLogCleanupConfig,
_cteqTriggerOnReplication :: TriggerOnReplication
}
$(makeLenses ''CreateEventTriggerQuery)
@ -134,7 +136,8 @@ instance Backend b => FromJSON (CreateEventTriggerQuery b) where
(Just _, Just _) -> fail "only one of webhook or webhook_from_env should be given"
_ -> fail "must provide webhook or webhook_from_env"
mapM_ checkEmptyCols [insert, update, delete]
return $ CreateEventTriggerQuery sourceName name table insert update delete (Just enableManual) retryConf webhook webhookFromEnv headers replace requestTransform responseTransform cleanupConfig
triggerOnReplication <- o .:? "trigger_on_replication" .!= defaultTriggerOnReplication @b
return $ CreateEventTriggerQuery sourceName name table insert update delete (Just enableManual) retryConf webhook webhookFromEnv headers replace requestTransform responseTransform cleanupConfig triggerOnReplication
where
checkEmptyCols spec =
case spec of
@ -208,7 +211,7 @@ resolveEventTriggerQuery ::
(Backend b, UserInfoM m, QErrM m, CacheRM m) =>
CreateEventTriggerQuery b ->
m (Bool, EventTriggerConf b)
resolveEventTriggerQuery (CreateEventTriggerQuery source name qt insert update delete enableManual retryConf webhook webhookFromEnv mheaders replace reqTransform respTransform cleanupConfig) = do
resolveEventTriggerQuery (CreateEventTriggerQuery source name qt insert update delete enableManual retryConf webhook webhookFromEnv mheaders replace reqTransform respTransform cleanupConfig triggerOnReplication) = do
ti <- askTableCoreInfo source qt
-- can only replace for same table
when replace $ do
@ -220,7 +223,7 @@ resolveEventTriggerQuery (CreateEventTriggerQuery source name qt insert update d
assertCols ti delete
let rconf = fromMaybe defaultRetryConf retryConf
return (replace, EventTriggerConf name (TriggerOpsDef insert update delete enableManual) webhook webhookFromEnv rconf mheaders reqTransform respTransform cleanupConfig)
return (replace, EventTriggerConf name (TriggerOpsDef insert update delete enableManual) webhook webhookFromEnv rconf mheaders reqTransform respTransform cleanupConfig triggerOnReplication)
where
assertCols :: TableCoreInfo b -> Maybe (SubscribeOpSpec b) -> m ()
assertCols ti opSpec = for_ opSpec \sos -> case sosColumns sos of
@ -454,23 +457,48 @@ buildEventTriggerInfo ::
TableName b ->
EventTriggerConf b ->
m (EventTriggerInfo b, [SchemaDependency])
buildEventTriggerInfo env source tableName (EventTriggerConf name def webhook webhookFromEnv rconf mheaders reqTransform respTransform cleanupConfig) = do
webhookConf <- case (webhook, webhookFromEnv) of
(Just w, Nothing) -> return $ WCValue w
(Nothing, Just wEnv) -> return $ WCEnv wEnv
_ -> throw500 "expected webhook or webhook_from_env"
let headerConfs = fromMaybe [] mheaders
webhookInfo <- getWebhookInfoFromConf env webhookConf
headerInfos <- getHeaderInfosFromConf env headerConfs
let eTrigInfo = EventTriggerInfo name def rconf webhookInfo headerInfos reqTransform respTransform cleanupConfig
tabDep =
SchemaDependency
( SOSourceObj source $
AB.mkAnyBackend $
SOITable @b tableName
)
DRParent
pure (eTrigInfo, tabDep : getTrigDefDeps @b source tableName def)
buildEventTriggerInfo
env
source
tableName
( EventTriggerConf
name
def
webhook
webhookFromEnv
rconf
mheaders
reqTransform
respTransform
cleanupConfig
triggerOnReplication
) = do
webhookConf <- case (webhook, webhookFromEnv) of
(Just w, Nothing) -> return $ WCValue w
(Nothing, Just wEnv) -> return $ WCEnv wEnv
_ -> throw500 "expected webhook or webhook_from_env"
let headerConfs = fromMaybe [] mheaders
webhookInfo <- getWebhookInfoFromConf env webhookConf
headerInfos <- getHeaderInfosFromConf env headerConfs
let eTrigInfo =
EventTriggerInfo
name
def
rconf
webhookInfo
headerInfos
reqTransform
respTransform
cleanupConfig
triggerOnReplication
tabDep =
SchemaDependency
( SOSourceObj source $
AB.mkAnyBackend $
SOITable @b tableName
)
DRParent
pure (eTrigInfo, tabDep : getTrigDefDeps @b source tableName def)
getTrigDefDeps ::
forall b.

View File

@ -1052,6 +1052,7 @@ buildSchemaCacheRule logger env = proc (metadataNoDefaults, invalidationKeys) ->
where
buildEventTrigger = proc (tableInfo, (metadataInvalidationKey, source, sourceConfig, table, migrationRecreateEventTriggers, eventTriggerConf)) -> do
let triggerName = etcName eventTriggerConf
triggerOnReplication = etcTriggerOnReplication eventTriggerConf
metadataObject = mkEventTriggerMetadataObject @b (metadataInvalidationKey, source, sourceConfig, table, migrationRecreateEventTriggers, eventTriggerConf)
schemaObjectId =
SOSourceObj source $
@ -1093,6 +1094,7 @@ buildSchemaCacheRule logger env = proc (metadataNoDefaults, invalidationKeys) ->
table
tableColumns
triggerName
triggerOnReplication
(etcDefinition eventTriggerConf)
(_tciPrimaryKey tableInfo)
if isCatalogUpdate || migrationRecreateEventTriggers == RETRecreate
@ -1102,6 +1104,7 @@ buildSchemaCacheRule logger env = proc (metadataNoDefaults, invalidationKeys) ->
( table,
tableColumns,
triggerName,
triggerOnReplication,
etcDefinition eventTriggerConf,
sourceConfig,
(_tciPrimaryKey tableInfo)
@ -1116,6 +1119,7 @@ buildSchemaCacheRule logger env = proc (metadataNoDefaults, invalidationKeys) ->
table
(tableColumns, _tciPrimaryKey tableInfo)
triggerName
triggerOnReplication
(etcDefinition eventTriggerConf)
else returnA -< ()
else returnA -< ()
@ -1133,6 +1137,7 @@ buildSchemaCacheRule logger env = proc (metadataNoDefaults, invalidationKeys) ->
( tableName,
tableColumns,
triggerName,
triggerOnReplication,
triggerDefinition,
sourceConfig,
primaryKey
@ -1148,6 +1153,7 @@ buildSchemaCacheRule logger env = proc (metadataNoDefaults, invalidationKeys) ->
tableName
tableColumns
triggerName
triggerOnReplication
triggerDefinition
primaryKey

View File

@ -204,7 +204,7 @@ addEventTriggerToCatalog qt etc = liftTx do
False
where
QualifiedObject sn tn = qt
(EventTriggerConf name _ _ _ _ _ _ _ _) = etc
(EventTriggerConf name _ _ _ _ _ _ _ _ _) = etc
addComputedFieldToCatalog ::
MonadTx m =>

View File

@ -22,7 +22,7 @@ import Data.Typeable (Typeable)
import Hasura.Base.Error
import Hasura.Base.ToErrorValue
import Hasura.Prelude
import Hasura.RQL.Types.Common (SourceName)
import Hasura.RQL.Types.Common
import Hasura.RQL.Types.HealthCheckImplementation (HealthCheckImplementation)
import Hasura.RQL.Types.ResizePool (ServerReplicas)
import Hasura.SQL.Backend
@ -355,5 +355,8 @@ class
-- Resize source pools based on the count of server replicas
resizeSourcePools :: SourceConfig b -> ServerReplicas -> IO ()
-- Default behaviour of SQL triggers on logically replicated database
defaultTriggerOnReplication :: TriggerOnReplication
-- Prisms
$(makePrisms ''ComputedFieldReturnType)

View File

@ -44,6 +44,7 @@ module Hasura.RQL.Types.Common
RemoteRelationshipG (..),
rrDefinition,
rrName,
TriggerOnReplication (..),
)
where
@ -631,6 +632,25 @@ instance NFData ApolloFederationConfig
isApolloFedV1enabled :: Maybe ApolloFederationConfig -> Bool
isApolloFedV1enabled = isJust
-- | Type to indicate if the SQL trigger should be enabled
-- when data is inserted into a table through replication.
data TriggerOnReplication
= TOREnableTrigger
| TORDisableTrigger
deriving (Show, Eq, Generic)
instance NFData TriggerOnReplication
instance FromJSON TriggerOnReplication where
parseJSON = withBool "TriggerOnReplication" $ \case
True -> pure TOREnableTrigger
False -> pure TORDisableTrigger
instance ToJSON TriggerOnReplication where
toJSON = \case
TOREnableTrigger -> Bool True
TORDisableTrigger -> Bool False
--------------------------------------------------------------------------------
-- metadata

View File

@ -1,4 +1,5 @@
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE UndecidableInstances #-}
module Hasura.RQL.Types.EventTrigger
( SubscribeOpSpec (..),
@ -37,6 +38,7 @@ module Hasura.RQL.Types.EventTrigger
where
import Data.Aeson
import Data.Aeson.Extended ((.=?))
import Data.Aeson.TH
import Data.HashMap.Strict qualified as M
import Data.List.NonEmpty qualified as NE
@ -48,7 +50,7 @@ import Hasura.Prelude
import Hasura.RQL.DDL.Headers
import Hasura.RQL.DDL.Webhook.Transform (MetadataResponseTransform, RequestTransform)
import Hasura.RQL.Types.Backend
import Hasura.RQL.Types.Common (EnvRecord, InputWebhook, ResolvedWebhook, SourceName (..))
import Hasura.RQL.Types.Common (EnvRecord, InputWebhook, ResolvedWebhook, SourceName (..), TriggerOnReplication (..))
import Hasura.RQL.Types.Eventing
import Hasura.SQL.Backend
import System.Cron (CronSchedule)
@ -334,15 +336,44 @@ data EventTriggerConf (b :: BackendType) = EventTriggerConf
etcHeaders :: Maybe [HeaderConf],
etcRequestTransform :: Maybe RequestTransform,
etcResponseTransform :: Maybe MetadataResponseTransform,
etcCleanupConfig :: Maybe AutoTriggerLogCleanupConfig
etcCleanupConfig :: Maybe AutoTriggerLogCleanupConfig,
etcTriggerOnReplication :: TriggerOnReplication
}
deriving (Show, Eq, Generic)
instance Backend b => FromJSON (EventTriggerConf b) where
parseJSON = genericParseJSON hasuraJSON {omitNothingFields = True}
parseJSON = withObject "EventTriggerConf" \o -> do
name <- o .: "name"
definition <- o .: "definition"
webhook <- o .:? "webhook"
webhookFromEnv <- o .:? "webhook_from_env"
retryConf <- o .: "retry_conf"
headers <- o .:? "headers"
requestTransform <- o .:? "request_transform"
responseTransform <- o .:? "response_transform"
cleanupConfig <- o .:? "cleanup_config"
triggerOnReplication <- o .:? "trigger_on_replication" .!= defaultTriggerOnReplication @b
return $ EventTriggerConf name definition webhook webhookFromEnv retryConf headers requestTransform responseTransform cleanupConfig triggerOnReplication
instance Backend b => ToJSON (EventTriggerConf b) where
toJSON = genericToJSON hasuraJSON {omitNothingFields = True}
toJSON (EventTriggerConf name definition webhook webhookFromEnv retryConf headers requestTransform responseTransform cleanupConfig triggerOnReplication) =
object $
[ "name" .= name,
"definition" .= definition,
"retry_conf" .= retryConf
]
<> catMaybes
[ "webhook" .=? webhook,
"webhook_from_env" .=? webhookFromEnv,
"headers" .=? headers,
"request_transform" .=? requestTransform,
"response_transform" .=? responseTransform,
"cleanup_config" .=? cleanupConfig,
"trigger_on_replication"
.=? if triggerOnReplication == defaultTriggerOnReplication @b
then Nothing
else Just triggerOnReplication
]
updateCleanupConfig :: Maybe AutoTriggerLogCleanupConfig -> EventTriggerConf b -> EventTriggerConf b
updateCleanupConfig cleanupConfig etConf = etConf {etcCleanupConfig = cleanupConfig}
@ -413,7 +444,8 @@ data EventTriggerInfo (b :: BackendType) = EventTriggerInfo
etiHeaders :: [EventHeaderInfo],
etiRequestTransform :: Maybe RequestTransform,
etiResponseTransform :: Maybe MetadataResponseTransform,
etiCleanupConfig :: Maybe AutoTriggerLogCleanupConfig
etiCleanupConfig :: Maybe AutoTriggerLogCleanupConfig,
etiTriggerOnReplication :: TriggerOnReplication
}
deriving (Generic, Eq)

View File

@ -192,6 +192,7 @@ class Backend b => BackendEventTrigger (b :: BackendType) where
TableName b ->
([ColumnInfo b], Maybe (PrimaryKey b (ColumnInfo b))) ->
TriggerName ->
TriggerOnReplication ->
TriggerOpsDef b ->
m ()
@ -202,6 +203,7 @@ class Backend b => BackendEventTrigger (b :: BackendType) where
TableName b ->
[ColumnInfo b] ->
TriggerName ->
TriggerOnReplication ->
TriggerOpsDef b ->
-- TODO: Naveen: Find a better way to pass these extra, backend specific
-- parameters instead of adding a bunch of Maybes to the type class
@ -329,8 +331,8 @@ instance BackendEventTrigger ('Postgres 'Citus) where
dropDanglingSQLTrigger _ _ _ _ = throw400 NotSupported "Event triggers are not supported for Citus sources"
redeliverEvent _ _ = throw400 NotSupported "Event triggers are not supported for Citus sources"
unlockEventsInSource _ _ = runExceptT $ throw400 NotSupported "Event triggers are not supported for Citus sources"
createTableEventTrigger _ _ _ _ _ _ _ = runExceptT $ throw400 NotSupported "Event triggers are not supported for Citus sources"
createMissingSQLTriggers _ _ _ _ _ = throw400 NotSupported $ "Event triggers are not supported for Citus sources"
createTableEventTrigger _ _ _ _ _ _ _ _ = runExceptT $ throw400 NotSupported "Event triggers are not supported for Citus sources"
createMissingSQLTriggers _ _ _ _ _ _ = throw400 NotSupported $ "Event triggers are not supported for Citus sources"
checkIfTriggerExists _ _ _ = throw400 NotSupported $ "Event triggers are not supported for Citus sources"
addCleanupSchedules _ _ = throw400 NotSupported $ "Event triggers are not supported for Citus sources"
deleteAllScheduledCleanups _ _ = throw400 NotSupported $ "Event triggers are not supported for Citus sources"
@ -398,8 +400,8 @@ instance BackendEventTrigger 'BigQuery where
dropDanglingSQLTrigger _ _ _ _ = throw400 NotSupported "Event triggers are not supported for BigQuery sources"
redeliverEvent _ _ = throw400 NotSupported "Event triggers are not supported for BigQuery sources"
unlockEventsInSource _ _ = runExceptT $ throw400 NotSupported "Event triggers are not supported for BigQuery sources"
createTableEventTrigger _ _ _ _ _ _ _ = runExceptT $ throw400 NotSupported "Event triggers are not supported for BigQuery sources"
createMissingSQLTriggers _ _ _ _ _ = throw400 NotSupported $ "Event triggers are not supported for BigQuery sources"
createTableEventTrigger _ _ _ _ _ _ _ _ = runExceptT $ throw400 NotSupported "Event triggers are not supported for BigQuery sources"
createMissingSQLTriggers _ _ _ _ _ _ = throw400 NotSupported $ "Event triggers are not supported for BigQuery sources"
checkIfTriggerExists _ _ _ = throw400 NotSupported $ "Event triggers are not supported for BigQuery sources"
addCleanupSchedules _ _ = throw400 NotSupported $ "Event triggers are not supported for BigQuery sources"
deleteAllScheduledCleanups _ _ = throw400 NotSupported $ "Event triggers are not supported for BigQuery sources"
@ -421,8 +423,8 @@ instance BackendEventTrigger 'MySQL where
dropDanglingSQLTrigger _ _ _ _ = throw400 NotSupported "Event triggers are not supported for MySQL sources"
redeliverEvent _ _ = throw400 NotSupported "Event triggers are not supported for MySQL sources"
unlockEventsInSource _ _ = runExceptT $ throw400 NotSupported "Event triggers are not supported for MySQL sources"
createTableEventTrigger _ _ _ _ _ _ _ = runExceptT $ throw400 NotSupported "Event triggers are not supported for MySQL sources"
createMissingSQLTriggers _ _ _ _ _ = throw400 NotSupported $ "Event triggers are not supported for MySQL sources"
createTableEventTrigger _ _ _ _ _ _ _ _ = runExceptT $ throw400 NotSupported "Event triggers are not supported for MySQL sources"
createMissingSQLTriggers _ _ _ _ _ _ = throw400 NotSupported $ "Event triggers are not supported for MySQL sources"
checkIfTriggerExists _ _ _ = throw400 NotSupported $ "Event triggers are not supported for MySQL sources"
addCleanupSchedules _ _ = throw400 NotSupported $ "Event triggers are not supported for MySQL sources"
deleteAllScheduledCleanups _ _ = throw400 NotSupported $ "Event triggers are not supported for MySQL sources"
@ -461,9 +463,9 @@ instance BackendEventTrigger 'DataConnector where
throw400 NotSupported "Event triggers are not supported for the Data Connector backend."
unlockEventsInSource _ _ =
runExceptT $ throw400 NotSupported "Event triggers are not supported for the Data Connector backend."
createTableEventTrigger _ _ _ _ _ _ _ =
createTableEventTrigger _ _ _ _ _ _ _ _ =
runExceptT $ throw400 NotSupported "Event triggers are not supported for the Data Connector backend."
createMissingSQLTriggers _ _ _ _ _ = throw400 NotSupported $ "Event triggers are not supported for Data Connector backend."
createMissingSQLTriggers _ _ _ _ _ _ = throw400 NotSupported $ "Event triggers are not supported for Data Connector backend."
checkIfTriggerExists _ _ _ = throw400 NotSupported $ "Event triggers are not supported for Data Connector backend."
addCleanupSchedules _ _ = throw400 NotSupported $ "Event triggers are not supported for Data Connector backend."
deleteAllScheduledCleanups _ _ = throw400 NotSupported $ "Event triggers are not supported for Data Connector backend."

View File

@ -36,7 +36,7 @@ import Hasura.RQL.Types.Action
)
import Hasura.RQL.Types.Allowlist (AllowlistEntry (..), MetadataAllowlist)
import Hasura.RQL.Types.ApiLimit (ApiLimit, emptyApiLimit)
import Hasura.RQL.Types.Backend (Backend)
import Hasura.RQL.Types.Backend (Backend, defaultTriggerOnReplication)
import Hasura.RQL.Types.Column (ColumnValues)
import Hasura.RQL.Types.Common (Comment, MetricsConfig, RemoteRelationshipG (..), commentToMaybeText, defaultActionTimeoutSecs, emptyMetricsConfig)
import Hasura.RQL.Types.CustomTypes
@ -328,21 +328,26 @@ sourcesToOrdJSONList sources =
]
<> catMaybes [maybeCommentToMaybeOrdPair comment]
eventTriggerConfToOrdJSON :: Backend b => EventTriggerConf b -> AO.Value
eventTriggerConfToOrdJSON (EventTriggerConf name definition webhook webhookFromEnv retryConf headers reqTransform respTransform cleanupConfig) =
AO.object $
[ ("name", AO.toOrdered name),
("definition", AO.toOrdered definition),
("retry_conf", AO.toOrdered retryConf)
]
<> catMaybes
[ maybeAnyToMaybeOrdPair "webhook" AO.toOrdered webhook,
maybeAnyToMaybeOrdPair "webhook_from_env" AO.toOrdered webhookFromEnv,
headers >>= listToMaybeOrdPair "headers" AO.toOrdered,
fmap (("request_transform",) . AO.toOrdered) reqTransform,
fmap (("response_transform",) . AO.toOrdered) respTransform,
maybeAnyToMaybeOrdPair "cleanup_config" AO.toOrdered cleanupConfig
eventTriggerConfToOrdJSON :: forall b. Backend b => EventTriggerConf b -> AO.Value
eventTriggerConfToOrdJSON (EventTriggerConf name definition webhook webhookFromEnv retryConf headers reqTransform respTransform cleanupConfig triggerOnReplication) =
let triggerOnReplicationMaybe =
if triggerOnReplication == defaultTriggerOnReplication @b
then Nothing
else Just triggerOnReplication
in AO.object $
[ ("name", AO.toOrdered name),
("definition", AO.toOrdered definition),
("retry_conf", AO.toOrdered retryConf)
]
<> catMaybes
[ maybeAnyToMaybeOrdPair "webhook" AO.toOrdered webhook,
maybeAnyToMaybeOrdPair "webhook_from_env" AO.toOrdered webhookFromEnv,
headers >>= listToMaybeOrdPair "headers" AO.toOrdered,
fmap (("request_transform",) . AO.toOrdered) reqTransform,
fmap (("response_transform",) . AO.toOrdered) respTransform,
maybeAnyToMaybeOrdPair "cleanup_config" AO.toOrdered cleanupConfig,
maybeAnyToMaybeOrdPair "trigger_on_replication" AO.toOrdered triggerOnReplicationMaybe
]
functionMetadataToOrdJSON :: Backend b => FunctionMetadata b -> AO.Value
functionMetadataToOrdJSON FunctionMetadata {..} =

View File

@ -15,7 +15,7 @@ import Hasura.Backends.Postgres.Connection
import Hasura.Base.Error
import Hasura.Prelude
import Hasura.RQL.Types.Backend (Backend)
import Hasura.RQL.Types.Common (InputWebhook)
import Hasura.RQL.Types.Common (InputWebhook, TriggerOnReplication (..))
import Hasura.RQL.Types.EventTrigger
import Hasura.SQL.Backend
import Hasura.Server.Migrate.Version
@ -75,8 +75,8 @@ from3To4 = liftTx $
) ->
EventTriggerConf ('Postgres 'Vanilla)
uncurryEventTrigger (trn, PG.ViaJSON tDef, w, nr, rint, PG.ViaJSON headers) =
EventTriggerConf trn tDef (Just w) Nothing (RetryConf nr rint Nothing) headers Nothing Nothing Nothing
updateEventTrigger3To4 etc@(EventTriggerConf name _ _ _ _ _ _ _ _) =
EventTriggerConf trn tDef (Just w) Nothing (RetryConf nr rint Nothing) headers Nothing Nothing Nothing TORDisableTrigger
updateEventTrigger3To4 etc@(EventTriggerConf name _ _ _ _ _ _ _ _ _) =
PG.unitQ
[PG.sql|
UPDATE hdb_catalog.event_triggers

View File

@ -1,11 +1,12 @@
CREATE OR ALTER TRIGGER #{qualifiedTriggerName}
ON #{qualifiedTableName}
AFTER DELETE
#{replicationClause}
AS
BEGIN
DECLARE @json NVARCHAR(MAX)
SET @json = (
SELECT
SELECT
#{deliveryColsSQLExpression}, NULL as [payload.data.new],
'#{operation}' as [payload.op],
'#{schemaName}' as [schema_name],

View File

@ -1,11 +1,12 @@
CREATE OR ALTER TRIGGER #{qualifiedTriggerName}
ON #{qualifiedTableName}
AFTER INSERT
#{replicationClause}
AS
BEGIN
DECLARE @json NVARCHAR(MAX)
SET @json = (
SELECT
SELECT
#{deliveryColsSQLExpression}, NULL as [payload.data.old],
'#{operation}' as [payload.op],
'#{schemaName}' as [schema_name],

View File

@ -1,6 +1,7 @@
CREATE OR ALTER TRIGGER #{qualifiedTriggerName}
ON #{qualifiedTableName}
AFTER UPDATE
#{replicationClause}
AS
BEGIN
DECLARE @json_pk_not_updated NVARCHAR(MAX)
@ -8,8 +9,8 @@ DECLARE @json_pk_updated NVARCHAR(MAX)
-- When primary key is not updated during a UPDATE transaction then construct both
-- 'data.old' and 'data.new'.
SET @json_pk_not_updated =
(SELECT
SET @json_pk_not_updated =
(SELECT
#{oldDeliveryColsSQLExp}, #{newDeliveryColsSQLExp},
'#{operation}' as [payload.op],
'#{schemaName}' as [schema_name],
@ -40,7 +41,7 @@ IF (#{isPrimaryKeyInListenColumnsExp})
-- table whose primary key does not match to any rows present in DELETED
-- table. When such an situation occurs during a UPDATE transaction, then
-- this means that the primary key of the row was updated.
(SELECT
(SELECT
#{oldDeliveryColsSQLExpWhenPrimaryKeyUpdated}, #{newDeliveryColsSQLExpWhenPrimaryKeyUpdated},
'#{operation}' as [payload.op],
'#{schemaName}' as [schema_name],