server: fix next_retry_at for MSSQL event triggers

PR-URL: https://github.com/hasura/graphql-engine-mono/pull/7376
Co-authored-by: Karthikeyan Chinnakonda <15602904+codingkarthik@users.noreply.github.com>
GitOrigin-RevId: 3a287271320fbb489dbabcd163b1b474fa5132b6
This commit is contained in:
pranshi06 2023-01-20 16:45:04 +05:30 committed by hasura-bot
parent 0270dbf4b4
commit ae5f3fe593
9 changed files with 262 additions and 39 deletions

View File

@ -42,6 +42,7 @@ library
, directory
, fast-logger
, filepath
, graphql-engine
, graphql-parser
, hasura-prelude
, hspec
@ -109,6 +110,7 @@ library
Test.DataConnector.SelectPermissionsSpec
Test.EventTriggers.EventTriggersSpecialCharactersSpec
Test.EventTriggers.MSSQL.EventTriggerDropSourceCleanupSpec
Test.EventTriggers.MSSQL.EventTriggerNextRetryAtTimezoneSpec
Test.EventTriggers.MSSQL.EventTriggersForReplicationSpec
Test.EventTriggers.MSSQL.EventTriggersUniqueNameSpec
Test.EventTriggers.MSSQL.EventTriggersNameQuotingSpec

View File

@ -0,0 +1,190 @@
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE ViewPatterns #-}
-- | Test that event trigger retry/deliver
module Test.EventTriggers.MSSQL.EventTriggerNextRetryAtTimezoneSpec (spec) where
import Control.Concurrent.Chan qualified as Chan
import Control.Concurrent.Extended (sleep)
import Data.List.NonEmpty qualified as NE
import Harness.Backend.Sqlserver qualified as Sqlserver
import Harness.GraphqlEngine qualified as GraphqlEngine
import Harness.Quoter.Yaml
import Harness.Test.Fixture qualified as Fixture
import Harness.Test.Schema (Table (..), table)
import Harness.Test.Schema qualified as Schema
import Harness.Test.SetupAction (permitTeardownFail)
import Harness.TestEnvironment (GlobalTestEnvironment, TestEnvironment)
import Harness.Webhook qualified as Webhook
import Harness.Yaml (shouldBeYaml, shouldReturnYaml)
import Hasura.Prelude
import System.Timeout (timeout)
import Test.HUnit.Base (assertFailure)
import Test.Hspec (SpecWith, describe, it)
--------------------------------------------------------------------------------
-- Preamble
spec :: SpecWith GlobalTestEnvironment
spec =
Fixture.runWithLocalTestEnvironment
( NE.fromList
[ (Fixture.fixture $ Fixture.Backend Sqlserver.backendTypeMetadata)
{ -- setup the webhook server as the local test environment,
-- so that the server can be referenced while testing
Fixture.mkLocalTestEnvironment = const Webhook.run,
Fixture.setupTeardown = \(testEnvironment, (webhookServer, _)) ->
[ permitTeardownFail (Sqlserver.setupTablesAction (schema "authors") testEnvironment),
Fixture.SetupAction
{ Fixture.setupAction = mssqlSetupWithEventTriggers testEnvironment webhookServer,
Fixture.teardownAction = \_ -> mssqlTeardown testEnvironment
}
]
}
]
)
tests
--------------------------------------------------------------------------------
-- * Backend
-- ** Schema
schema :: Text -> [Schema.Table]
schema authorTableName = [authorsTable authorTableName]
authorsTable :: Text -> Schema.Table
authorsTable tableName =
(table tableName)
{ tableColumns =
[ Schema.column "id" Schema.TInt,
Schema.column "name" Schema.TStr
],
tablePrimaryKey = ["id"],
tableData =
[ [Schema.VInt 1, Schema.VStr "Author 1"],
[Schema.VInt 2, Schema.VStr "Author 2"]
]
}
--------------------------------------------------------------------------------
-- Tests
tests :: Fixture.Options -> SpecWith (TestEnvironment, (GraphqlEngine.Server, Webhook.EventsQueue))
tests opts = do
nextRetryAtTimezoneChange opts
nextRetryAtTimezoneChange :: Fixture.Options -> SpecWith (TestEnvironment, (GraphqlEngine.Server, Webhook.EventsQueue))
nextRetryAtTimezoneChange opts =
describe "event trigger retries if the event trigger is undelivered (and retries are available)" do
-- The test checks that the event trigger retries as expected. In the test, we fire up the event trigger by adding a
-- row to the table. We wait for a few seconds so the event has retried completely and then see if the number of
-- retries are 2 (the event retries once)
it "check: the total number of tries is (number of retries + 1)" $
\(testEnvironment, (_, (Webhook.EventsQueue eventsQueue))) -> do
let schemaName :: Schema.SchemaName
schemaName = Schema.getSchemaName testEnvironment
insertQuery =
-- Add row to the table `author` (which will fire up the event trigger)
[interpolateYaml|
type: mssql_run_sql
args:
source: mssql
sql: "INSERT INTO #{schemaName}.authors (id, name) values (3, N'john')"
|]
-- get the `tries` column to see if the event was retried
selectQuery =
[interpolateYaml|
type: mssql_run_sql
args:
source: mssql
sql: "SELECT tries FROM hdb_catalog.event_log"
|]
expectedResponse =
[yaml|
result_type: CommandOk
result: null
|]
expectedEventPayload =
[yaml|
old: null
new:
name: john
id: 3
|]
expectedTotalTries =
[yaml|
result_type: TuplesOk
result: [
[tries], [2]
]
|]
-- Insert a row into the table with event trigger
shouldReturnYaml
opts
(GraphqlEngine.postV2Query 200 testEnvironment insertQuery)
expectedResponse
-- Check if there was a payload generated due to the insert statement
eventPayload <-
-- wait for the event for a maximum of 5 seconds
timeout (5 * 1000000) (Chan.readChan eventsQueue)
>>= (`onNothing` (assertFailure "Event expected, but not fired"))
eventPayload `shouldBeYaml` expectedEventPayload
-- Wait for 15 seconds to make sure that the event trigger has been retried
sleep $ seconds 10
-- Check the retries column of hdb_catalog.event_log table to see that the event has been retried once (that is
-- the event has tried to deliver 2 times in total)
shouldReturnYaml
opts
(GraphqlEngine.postV2Query 200 testEnvironment selectQuery)
expectedTotalTries
--------------------------------------------------------------------------------
-- ** Setup and teardown override
mssqlSetupWithEventTriggers :: TestEnvironment -> GraphqlEngine.Server -> IO ()
mssqlSetupWithEventTriggers testEnvironment webhookServer = do
let schemaName :: Schema.SchemaName
schemaName = Schema.getSchemaName testEnvironment
webhookServerNextRetryEndpoint = GraphqlEngine.serverUrl webhookServer ++ "/nextRetry"
GraphqlEngine.postMetadata_ testEnvironment $
[interpolateYaml|
type: bulk
args:
- type: mssql_create_event_trigger
args:
name: authors_all
source: mssql
table:
name: authors
schema: #{schemaName}
retry_conf:
num_retries: 1
interval_sec: 5
webhook: #{webhookServerNextRetryEndpoint}
insert:
columns: "*"
|]
mssqlTeardown :: TestEnvironment -> IO ()
mssqlTeardown testEnvironment = do
GraphqlEngine.postMetadata_ testEnvironment $
[yaml|
type: bulk
args:
- type: mssql_delete_event_trigger
args:
name: authors_all
source: mssql
|]

View File

@ -17,6 +17,7 @@ import Harness.TestEnvironment (Server (..), serverUrl)
import Hasura.Base.Error (iResultToMaybe)
import Hasura.Prelude
import Hasura.Server.Utils (executeJSONPath)
import Network.HTTP.Client.Transformable qualified as HTTP
import Network.Socket qualified as Socket
import Network.Wai.Extended qualified as Wai
import Network.Wai.Handler.Warp qualified as Warp
@ -31,6 +32,9 @@ newtype EventsQueue = EventsQueue (Chan.Chan Aeson.Value)
-- - GET on @/@, which returns a simple 200 OK;
-- - POST on @/echo@, which extracts the event data from the body
-- of the request and inserts it into the `EventsQueue`.
-- - POST on @/nextRetry@, which extracts the event data from the body
-- of the request and inserts it into the `EventsQueue` and returns 503
-- error code.
--
-- This function performs a health check, using a GET on /, to ensure that the
-- server was started correctly, and will throw an exception if the health check
@ -43,6 +47,18 @@ run = mkTestResource do
port <- bracket (Warp.openFreePort) (Socket.close . snd) (pure . fst)
eventsQueueChan <- Chan.newChan
let eventsQueue = EventsQueue eventsQueueChan
extractEventDataInsertIntoEventQueue = do
req <- Spock.request
body <- liftIO $ Wai.strictRequestBody req
let jsonBody = Aeson.decode body
let eventDataPayload =
-- Only extract the data payload from the request body
let mkJSONPathE = either (error . T.unpack) id . parseJSONPath
eventJSONPath = mkJSONPathE "$.event.data"
in iResultToMaybe =<< executeJSONPath eventJSONPath <$> jsonBody
liftIO $
Chan.writeChan eventsQueueChan $
fromMaybe (error "error in parsing the event data from the body") eventDataPayload
thread <- Async.async $
Spock.runSpockNoBanner port $
Spock.spockT id $ do
@ -53,19 +69,12 @@ run = mkTestResource do
Spock.json $
Aeson.String "world"
Spock.post "/echo" $ do
req <- Spock.request
body <- liftIO $ Wai.strictRequestBody req
let jsonBody = Aeson.decode body
let eventDataPayload =
-- Only extract the data payload from the request body
let mkJSONPathE = either (error . T.unpack) id . parseJSONPath
eventJSONPath = mkJSONPathE "$.event.data"
in iResultToMaybe =<< executeJSONPath eventJSONPath <$> jsonBody
liftIO $
Chan.writeChan eventsQueueChan $
fromMaybe (error "error in parsing the event data from the body") eventDataPayload
Spock.setHeader "Content-Type" "application/json; charset=utf-8"
extractEventDataInsertIntoEventQueue
Spock.json $ Aeson.object ["success" Aeson..= True]
Spock.post "/nextRetry" $ do
extractEventDataInsertIntoEventQueue
Spock.setStatus HTTP.status503
let server = Server {port = fromIntegral port, urlPrefix, thread}
pure
AcquiredResource

View File

@ -384,9 +384,9 @@ setErrorTx event = \case
WHERE id = $eventId
|]
-- See Note [UTCTIME not supported in SQL Server]
setRetryTx :: Event 'MSSQL -> UTCTime -> MaintenanceMode MaintenanceModeVersion -> TxE QErr ()
setRetryTx event utcTime maintenanceMode = do
-- since `convertUTCToDatetime2` uses utc as timezone, it will not affect the value
time <- convertUTCToDatetime2 utcTime
case maintenanceMode of
(MaintenanceModeEnabled PreviousMMVersion) -> throw500 "unexpected: there is no previous maintenance mode version supported for MSSQL event triggers"
@ -397,11 +397,13 @@ setRetryTx event utcTime maintenanceMode = do
-- NOTE: Naveen: The following method to convert from Datetime to Datetimeoffset was
-- taken from https://stackoverflow.com/questions/17866311/how-to-cast-datetime-to-datetimeoffset
latestVersionSetRetry time =
-- `time` is in UTC (without the timezone offset). The function TODATETIMEOFFSET adds the offset 00:00 (UTC) to
-- `time`, which collectively represents the value present in next_retry_at
unitQueryE
HGE.defaultMSSQLTxErrorHandler
[ODBC.sql|
UPDATE hdb_catalog.event_log
SET next_retry_at = TODATETIMEOFFSET ($time, DATEPART(TZOFFSET, SYSDATETIMEOFFSET())), locked = NULL
SET next_retry_at = TODATETIMEOFFSET ($time, 0), locked = NULL
WHERE id = $eventId
|]
@ -594,22 +596,9 @@ getMaintenanceModeVersionTx = do
<> " but received "
<> tshow catalogVersion
-- | Note: UTCTIME not supported in SQL Server
--
-- Refer 'ToSql UTCTIME' instance of odbc package:
-- https://github.com/fpco/odbc/blob/f4f04ea15d14e9a3ed455f7c728dc08734eef8ae/src/Database/ODBC/SQLServer.hs#L377
--
-- We use SYSDATETIMEOFFSET() to store time values along with it's time
-- zone offset in event_log table. Since ODBC server does not support time zones,
-- we use a workaround.
--
-- We wrap the time value in Datetime2, but before we insert it into the
-- event_log table we convert it into UTCTIME using the 'TODATETIMEOFFSET()'
-- sql function.
convertUTCToDatetime2 :: MonadIO m => UTCTime -> m Datetime2
convertUTCToDatetime2 utcTime = do
timezone <- liftIO $ getTimeZone utcTime
let localTime = utcToLocalTime timezone utcTime
let localTime = utcToLocalTime utc utcTime
return $ Datetime2 localTime
checkIfTriggerExistsQ ::
@ -932,8 +921,7 @@ addCleanupSchedules ::
addCleanupSchedules sourceConfig triggersWithcleanupConfig =
unless (null triggersWithcleanupConfig) $ do
currTimeUTC <- liftIO getCurrentTime
timeZone <- liftIO $ getTimeZone currTimeUTC
let currTime = utcToZonedTime timeZone currTimeUTC
let currTime = utcToZonedTime utc currTimeUTC
triggerNames = map fst triggersWithcleanupConfig
allScheduledCleanupsInDB <- liftEitherM $ liftIO $ runMSSQLSourceWriteTx sourceConfig $ selectLastCleanupScheduledTimestamp triggerNames
let triggerMap = Map.fromList $ allScheduledCleanupsInDB
@ -945,7 +933,7 @@ addCleanupSchedules sourceConfig triggersWithcleanupConfig =
Just (count, lastTime) -> if count < 5 then (Just lastTime) else Nothing
in fmap
( \lastScheduledTimestamp ->
(tName, map (Datetimeoffset . utcToZonedTime timeZone) $ generateScheduleTimes (zonedTimeToUTC lastScheduledTimestamp) cleanupSchedulesToBeGenerated (_atlccSchedule cConfig))
(tName, map (Datetimeoffset . utcToZonedTime utc) $ generateScheduleTimes (zonedTimeToUTC lastScheduledTimestamp) cleanupSchedulesToBeGenerated (_atlccSchedule cConfig))
)
lastScheduledTime
)
@ -1032,7 +1020,7 @@ getCleanupEventsForDeletionTx = do
SELECT id, trigger_name, ROW_NUMBER()
OVER(PARTITION BY trigger_name ORDER BY scheduled_at DESC) AS rn
FROM hdb_catalog.hdb_event_log_cleanups
WHERE status = 'scheduled' AND scheduled_at < CURRENT_TIMESTAMP
WHERE status = 'scheduled' AND scheduled_at < SYSDATETIMEOFFSET() AT TIME ZONE 'UTC'
) AS a
WHERE rn = 1
|]
@ -1045,7 +1033,7 @@ getCleanupEventsForDeletionTx = do
( rawUnescapedText
[ST.st|
SELECT CAST(id AS nvarchar(36)) FROM hdb_catalog.hdb_event_log_cleanups
WHERE status = 'scheduled' AND scheduled_at < CURRENT_TIMESTAMP AND id NOT IN
WHERE status = 'scheduled' AND scheduled_at < SYSDATETIMEOFFSET() AT TIME ZONE 'UTC' AND id NOT IN
(SELECT n from (VALUES #{cleanupIDsSQLValue}) AS X(n));
|]
)
@ -1145,7 +1133,7 @@ deleteEventTriggerLogsTx TriggerLogCleanupConfig {..} = do
[ODBC.sql|
SELECT TOP ($qBatchSize) CAST(id AS nvarchar(36)) FROM hdb_catalog.event_log WITH (UPDLOCK, READPAST)
WHERE ((delivered = 1 OR error = 1) AND trigger_name = $qTriggerName )
AND created_at < DATEADD(HOUR, - $qRetentionPeriod, CURRENT_TIMESTAMP)
AND created_at < DATEADD(HOUR, - $qRetentionPeriod, SYSDATETIMEOFFSET() AT TIME ZONE 'UTC')
AND locked IS NULL
|]
if null deadEventIDs
@ -1157,7 +1145,7 @@ deleteEventTriggerLogsTx TriggerLogCleanupConfig {..} = do
rawUnescapedText $
[ST.st|
UPDATE hdb_catalog.event_log
SET locked = CURRENT_TIMESTAMP
SET locked = SYSDATETIMEOFFSET() AT TIME ZONE 'UTC'
WHERE id = ANY ( SELECT id from (VALUES #{eventIdsValues}) AS X(id))
AND locked IS NULL
|]

View File

@ -25,7 +25,7 @@ initialSourceCatalogVersion :: SourceCatalogVersion
initialSourceCatalogVersion = Version.SourceCatalogVersion 1
latestSourceCatalogVersion :: SourceCatalogVersion
latestSourceCatalogVersion = Version.SourceCatalogVersion 3
latestSourceCatalogVersion = Version.SourceCatalogVersion 4
previousSourceCatalogVersions :: [SourceCatalogVersion]
previousSourceCatalogVersions = [initialSourceCatalogVersion .. pred latestSourceCatalogVersion]

View File

@ -25,6 +25,12 @@ CREATE TABLE hdb_catalog.hdb_source_catalog_version(
CREATE UNIQUE INDEX hdb_source_catalog_version_one_row
ON hdb_catalog.hdb_source_catalog_version((version IS NOT NULL));
/* TODO: The columns `created_at` and `next_retry_at` does not contain timezone (TIMESTAMP type) while `locked` has a timezone
offset (TIMESTAMPTZ). The time repesented by TIMESTAMP is in the timezone of the Postgres server. If the
timezone of the PG server is changed, then the entries in the event_log table can be confusing since there is no
timezone offset to highlight the difference. A possible solution to it is to change the type of the two columns to
include the timezone offset and keep all the times in UTC. However, altering a column type is a time
taking process, hence not migrating the source to add a timezone offset */
CREATE TABLE hdb_catalog.event_log
(
id TEXT DEFAULT hdb_catalog.gen_hasura_uuid() PRIMARY KEY,

View File

@ -16,7 +16,7 @@ CREATE TABLE hdb_catalog.event_log
delivered BIT NOT NULL DEFAULT 0,
error BIT NOT NULL DEFAULT 0,
tries INTEGER NOT NULL DEFAULT 0,
created_at DATETIMEOFFSET(7) NOT NULL DEFAULT SYSDATETIMEOFFSET(),
created_at DATETIMEOFFSET(7) NOT NULL DEFAULT SYSDATETIMEOFFSET() AT TIME ZONE 'UTC',
locked DATETIMEOFFSET(7),
next_retry_at DATETIMEOFFSET(7),
archived BIT NOT NULL DEFAULT 0
@ -39,7 +39,7 @@ CREATE TABLE hdb_catalog.event_invocation_logs (
status INTEGER,
request NVARCHAR(MAX),
response NVARCHAR(MAX),
created_at DATETIMEOFFSET(7) NOT NULL DEFAULT SYSDATETIMEOFFSET()
created_at DATETIMEOFFSET(7) NOT NULL DEFAULT SYSDATETIMEOFFSET() AT TIME ZONE 'UTC'
);
/* This index improves the performance of deletes by event_id, so that if somebody

View File

@ -1,5 +1,5 @@
UPDATE hdb_catalog.event_log
SET locked = SYSDATETIMEOFFSET()
SET locked = SYSDATETIMEOFFSET() AT TIME ZONE 'UTC'
OUTPUT CONVERT(varchar(MAX), inserted.id), inserted.schema_name, inserted.table_name, inserted.trigger_name, inserted.payload, inserted.tries, CONVERT(varchar(MAX), inserted.created_at), CONVERT(varchar(MAX), inserted.next_retry_at)
WHERE id in
(SELECT TOP(#{fetchBatchSize}) l.id

View File

@ -0,0 +1,28 @@
-- created_at column previously had a default value in localtime which can result in timezone discrepancy, we are
-- changing it to UTC time.
-- Please note that we don't know the name of the default value constraint, that is why the query first fetches the name
-- from sys.default_constraints and then removes the default value constraint. Then we add a different default value to
-- the column created_at.
DECLARE @EventLogConstraintName nvarchar(200)
SELECT @EventLogConstraintName = name
FROM sys.default_constraints
WHERE parent_object_id = object_id('hdb_catalog.event_log')
AND parent_column_id = columnproperty(object_id('hdb_catalog.event_log'), 'created_at', 'ColumnId')
IF @EventLogConstraintName IS NOT NULL
EXEC('ALTER TABLE hdb_catalog.event_log DROP CONSTRAINT ' + @EventLogConstraintName)
ALTER TABLE hdb_catalog.event_log ADD CONSTRAINT event_log_created_at_default_value DEFAULT SYSDATETIMEOFFSET() AT TIME ZONE 'UTC' FOR created_at;
-- Please note that we don't know the name of the default value constraint, that is why the query first fetches the name
-- from sys.default_constraints and then removes the default value constraint. Then we add a different default value to
-- the column created_at.
DECLARE @EventInvocationLogConstraintName nvarchar(200)
SELECT @EventInvocationLogConstraintName = name
FROM sys.default_constraints
WHERE parent_object_id = object_id('hdb_catalog.event_invocation_logs')
AND parent_column_id = columnproperty(object_id('hdb_catalog.event_invocation_logs'), 'created_at', 'ColumnId')
IF @EventInvocationLogConstraintName IS NOT NULL
EXEC('ALTER TABLE hdb_catalog.event_invocation_logs DROP CONSTRAINT ' + @EventInvocationLogConstraintName)
ALTER TABLE hdb_catalog.event_invocation_logs ADD CONSTRAINT event_invocation_logs_created_at_default_value DEFAULT SYSDATETIMEOFFSET() AT TIME ZONE 'UTC' FOR created_at;