mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-15 09:22:43 +03:00
Only schedule cleanup for event triggers if any exist.
Users were seeing spurious log entries saying that cleanup could not be scheduled for BigQuery sources as event triggers are not supported. This makes the error go away by enforcing non-emptyness on the list of triggers, which means we cannot call the function throwing the error, as there will never be any event triggers for which to schedule cleanup. PR-URL: https://github.com/hasura/graphql-engine-mono/pull/10987 GitOrigin-RevId: ce9bd6a340bbc8f45f4c74cf9c69f65c4ee91bf5
This commit is contained in:
parent
e1bceb4b8e
commit
9c88a3a16f
@ -73,13 +73,13 @@ import Text.Builder qualified as TB
|
||||
import Text.Shakespeare.Text qualified as ST
|
||||
|
||||
-- | creates a SQL Values list from haskell list (('123-abc'), ('456-vgh'), ('234-asd'))
|
||||
generateSQLValuesFromList :: (ToTxt a) => [a] -> Text
|
||||
generateSQLValuesFromList :: (Functor f, Foldable f, ToTxt a) => f a -> Text
|
||||
generateSQLValuesFromList = generateSQLValuesFromListWith (\t -> "'" <> toTxt t <> "'")
|
||||
|
||||
generateSQLValuesFromListWith :: (a -> Text) -> [a] -> Text
|
||||
generateSQLValuesFromListWith :: (Functor f, Foldable f) => (a -> Text) -> f a -> Text
|
||||
generateSQLValuesFromListWith f events = commaSeparated values
|
||||
where
|
||||
values = map (\e -> "(" <> f e <> ")") events
|
||||
values = fmap (\e -> "(" <> f e <> ")") events
|
||||
|
||||
fetchUndeliveredEvents ::
|
||||
(MonadIO m, MonadError QErr m) =>
|
||||
@ -905,16 +905,16 @@ mkUpdateTriggerQuery
|
||||
addCleanupSchedules ::
|
||||
(MonadIO m, MonadError QErr m) =>
|
||||
MSSQLSourceConfig ->
|
||||
[(TriggerName, AutoTriggerLogCleanupConfig)] ->
|
||||
NonEmpty (TriggerName, AutoTriggerLogCleanupConfig) ->
|
||||
m ()
|
||||
addCleanupSchedules sourceConfig triggersWithcleanupConfig =
|
||||
unless (null triggersWithcleanupConfig) $ do
|
||||
addCleanupSchedules sourceConfig triggersWithCleanupConfig =
|
||||
unless (null triggersWithCleanupConfig) $ do
|
||||
currTimeUTC <- liftIO getCurrentTime
|
||||
let currTime = utcToZonedTime utc currTimeUTC
|
||||
triggerNames = map fst triggersWithcleanupConfig
|
||||
triggerNames = fmap fst triggersWithCleanupConfig
|
||||
allScheduledCleanupsInDB <- liftEitherM $ liftIO $ runMSSQLSourceWriteTx sourceConfig $ selectLastCleanupScheduledTimestamp triggerNames
|
||||
let triggerMap = HashMap.fromList $ allScheduledCleanupsInDB
|
||||
scheduledTriggersAndTimestamps =
|
||||
scheduledTriggersAndTimestampsMaybe =
|
||||
mapMaybe
|
||||
( \(tName, cConfig) ->
|
||||
let lastScheduledTime = case HashMap.lookup tName triggerMap of
|
||||
@ -926,15 +926,18 @@ addCleanupSchedules sourceConfig triggersWithcleanupConfig =
|
||||
)
|
||||
lastScheduledTime
|
||||
)
|
||||
triggersWithcleanupConfig
|
||||
unless (null scheduledTriggersAndTimestamps)
|
||||
$ liftEitherM
|
||||
$ liftIO
|
||||
$ runMSSQLSourceWriteTx sourceConfig
|
||||
$ insertEventTriggerCleanupLogsTx scheduledTriggersAndTimestamps
|
||||
(toList triggersWithCleanupConfig)
|
||||
fmap (fromMaybe ())
|
||||
$ for
|
||||
(nonEmpty scheduledTriggersAndTimestampsMaybe)
|
||||
( liftEitherM
|
||||
. liftIO
|
||||
. runMSSQLSourceWriteTx sourceConfig
|
||||
. insertEventTriggerCleanupLogsTx
|
||||
)
|
||||
|
||||
-- | Insert the cleanup logs for the given trigger name and schedules
|
||||
insertEventTriggerCleanupLogsTx :: [(TriggerName, [Datetimeoffset])] -> TxET QErr IO ()
|
||||
insertEventTriggerCleanupLogsTx :: NonEmpty (TriggerName, [Datetimeoffset]) -> TxET QErr IO ()
|
||||
insertEventTriggerCleanupLogsTx triggerNameWithSchedules =
|
||||
unitQueryE
|
||||
HGE.defaultMSSQLTxErrorHandler
|
||||
@ -955,10 +958,10 @@ insertEventTriggerCleanupLogsTx triggerNameWithSchedules =
|
||||
)
|
||||
schedules
|
||||
)
|
||||
triggerNameWithSchedules
|
||||
(toList triggerNameWithSchedules)
|
||||
|
||||
-- | Get the last scheduled timestamp for a given event trigger name
|
||||
selectLastCleanupScheduledTimestamp :: [TriggerName] -> TxET QErr IO [(TriggerName, (Int, ZonedTime))]
|
||||
selectLastCleanupScheduledTimestamp :: NonEmpty TriggerName -> TxET QErr IO [(TriggerName, (Int, ZonedTime))]
|
||||
selectLastCleanupScheduledTimestamp triggerNames =
|
||||
map
|
||||
( \(triggerName, count, lastScheduledTime) ->
|
||||
@ -976,7 +979,7 @@ selectLastCleanupScheduledTimestamp triggerNames =
|
||||
|]
|
||||
)
|
||||
where
|
||||
triggerNamesValues = generateSQLValuesFromList $ map triggerNameToTxt triggerNames
|
||||
triggerNamesValues = generateSQLValuesFromListWith triggerNameToTxt triggerNames
|
||||
|
||||
deleteAllScheduledCleanupsTx :: TriggerName -> TxE QErr ()
|
||||
deleteAllScheduledCleanupsTx triggerName = do
|
||||
|
@ -883,43 +883,45 @@ mkAllTriggersQ triggerName table triggerOnReplication allCols fullspec = do
|
||||
addCleanupSchedules ::
|
||||
(MonadIO m, MonadError QErr m) =>
|
||||
PGSourceConfig ->
|
||||
[(TriggerName, AutoTriggerLogCleanupConfig)] ->
|
||||
NonEmpty (TriggerName, AutoTriggerLogCleanupConfig) ->
|
||||
m ()
|
||||
addCleanupSchedules sourceConfig triggersWithcleanupConfig =
|
||||
unless (null triggersWithcleanupConfig) $ do
|
||||
let triggerNames = map fst triggersWithcleanupConfig
|
||||
countAndLastSchedules <- liftEitherM $ liftIO $ runPgSourceReadTx sourceConfig $ selectLastCleanupScheduledTimestamp triggerNames
|
||||
currTime <- liftIO $ Time.getCurrentTime
|
||||
let triggerMap = HashMap.fromList $ map (\(triggerName, count, lastTime) -> (triggerName, (count, lastTime))) countAndLastSchedules
|
||||
scheduledTriggersAndTimestamps =
|
||||
mapMaybe
|
||||
( \(triggerName, cleanupConfig) ->
|
||||
let lastScheduledTime = case HashMap.lookup triggerName triggerMap of
|
||||
Nothing -> Just currTime
|
||||
Just (count, lastTime) -> if count < 5 then (Just lastTime) else Nothing
|
||||
in fmap
|
||||
( \lastScheduledTimestamp ->
|
||||
(triggerName, generateScheduleTimes lastScheduledTimestamp cleanupSchedulesToBeGenerated (_atlccSchedule cleanupConfig))
|
||||
)
|
||||
lastScheduledTime
|
||||
)
|
||||
triggersWithcleanupConfig
|
||||
unless (null scheduledTriggersAndTimestamps)
|
||||
$ liftEitherM
|
||||
$ liftIO
|
||||
$ runPgSourceWriteTx sourceConfig InternalRawQuery
|
||||
$ insertEventTriggerCleanupLogsTx scheduledTriggersAndTimestamps
|
||||
addCleanupSchedules sourceConfig triggersWithCleanupConfig = do
|
||||
let triggerNames = fmap fst triggersWithCleanupConfig
|
||||
countAndLastSchedules <- liftEitherM $ liftIO $ runPgSourceReadTx sourceConfig $ selectLastCleanupScheduledTimestamp triggerNames
|
||||
currTime <- liftIO $ Time.getCurrentTime
|
||||
let triggerMap = HashMap.fromList $ map (\(triggerName, count, lastTime) -> (triggerName, (count, lastTime))) countAndLastSchedules
|
||||
scheduledTriggersAndTimestampsMaybe =
|
||||
mapMaybe
|
||||
( \(triggerName, cleanupConfig) ->
|
||||
let lastScheduledTime = case HashMap.lookup triggerName triggerMap of
|
||||
Nothing -> Just currTime
|
||||
Just (count, lastTime) -> if count < 5 then (Just lastTime) else Nothing
|
||||
in fmap
|
||||
( \lastScheduledTimestamp ->
|
||||
(triggerName, generateScheduleTimes lastScheduledTimestamp cleanupSchedulesToBeGenerated (_atlccSchedule cleanupConfig))
|
||||
)
|
||||
lastScheduledTime
|
||||
)
|
||||
(toList triggersWithCleanupConfig)
|
||||
fmap (fromMaybe ())
|
||||
$ for
|
||||
(nonEmpty scheduledTriggersAndTimestampsMaybe)
|
||||
( liftEitherM
|
||||
. liftIO
|
||||
. runPgSourceWriteTx sourceConfig InternalRawQuery
|
||||
. insertEventTriggerCleanupLogsTx
|
||||
)
|
||||
|
||||
-- | Insert the cleanup logs for the fiven trigger name and schedules
|
||||
insertEventTriggerCleanupLogsTx :: [(TriggerName, [Time.UTCTime])] -> PG.TxET QErr IO ()
|
||||
insertEventTriggerCleanupLogsTx triggersWithschedules = do
|
||||
insertEventTriggerCleanupLogsTx :: NonEmpty (TriggerName, [Time.UTCTime]) -> PG.TxET QErr IO ()
|
||||
insertEventTriggerCleanupLogsTx triggersWithSchedules = do
|
||||
let insertCleanupEventsSql =
|
||||
TB.run
|
||||
$ toSQL
|
||||
S.SQLInsert
|
||||
{ siTable = cleanupLogTable,
|
||||
siCols = map unsafePGCol ["trigger_name", "scheduled_at", "status"],
|
||||
siValues = S.ValuesExp $ concatMap genArr triggersWithschedules,
|
||||
siValues = S.ValuesExp $ concatMap genArr triggersWithSchedules,
|
||||
siConflict = Just $ S.DoNothing Nothing,
|
||||
siRet = Nothing
|
||||
}
|
||||
@ -930,7 +932,7 @@ insertEventTriggerCleanupLogsTx triggersWithschedules = do
|
||||
toTupleExp = S.TupleExp . map S.SELit
|
||||
|
||||
-- | Get the last scheduled timestamp for a given event trigger name
|
||||
selectLastCleanupScheduledTimestamp :: [TriggerName] -> PG.TxET QErr IO [(TriggerName, Int, Time.UTCTime)]
|
||||
selectLastCleanupScheduledTimestamp :: NonEmpty TriggerName -> PG.TxET QErr IO [(TriggerName, Int, Time.UTCTime)]
|
||||
selectLastCleanupScheduledTimestamp triggerNames =
|
||||
PG.withQE
|
||||
defaultTxErrorHandler
|
||||
@ -940,7 +942,7 @@ selectLastCleanupScheduledTimestamp triggerNames =
|
||||
WHERE status='scheduled' AND trigger_name = ANY($1::text[])
|
||||
GROUP BY trigger_name
|
||||
|]
|
||||
(Identity $ PGTextArray $ map triggerNameToTxt triggerNames)
|
||||
(Identity . PGTextArray . map triggerNameToTxt $ toList triggerNames)
|
||||
True
|
||||
|
||||
deleteAllScheduledCleanupsTx :: TriggerName -> PG.TxE QErr ()
|
||||
|
@ -230,7 +230,7 @@ class (Backend b) => BackendEventTrigger (b :: BackendType) where
|
||||
addCleanupSchedules ::
|
||||
(MonadIO m, MonadError QErr m) =>
|
||||
SourceConfig b ->
|
||||
[(TriggerName, AutoTriggerLogCleanupConfig)] ->
|
||||
NonEmpty (TriggerName, AutoTriggerLogCleanupConfig) ->
|
||||
m ()
|
||||
|
||||
-- | @deleteAllScheduledCleanups@ deletes all scheduled cleanup logs for a given event trigger
|
||||
|
@ -165,7 +165,7 @@ eventTriggerLogCleanupSpec sourceConfig = do
|
||||
-- run the setup
|
||||
liftIO setup
|
||||
-- run the core generator logic
|
||||
liftIO $ runExceptQErr $ addCleanupSchedules sourceConfig [(triggerName, autoTriggerCleanupConfig)]
|
||||
liftIO $ runExceptQErr $ addCleanupSchedules sourceConfig $ pure (triggerName, autoTriggerCleanupConfig)
|
||||
-- check if the cleanups are scheduled
|
||||
runSQLQuery (getCleanupStatusCount triggerName "scheduled") `shouldReturn` cleanupSchedulesToBeGenerated
|
||||
-- finally teardown
|
||||
@ -176,7 +176,7 @@ eventTriggerLogCleanupSpec sourceConfig = do
|
||||
-- run the setup
|
||||
liftIO setup
|
||||
-- add some cleanup schedules
|
||||
liftIO $ runExceptQErr $ addCleanupSchedules sourceConfig [(triggerName, autoTriggerCleanupConfig)]
|
||||
liftIO $ runExceptQErr $ addCleanupSchedules sourceConfig $ pure (triggerName, autoTriggerCleanupConfig)
|
||||
-- move 11 minutes into the future, this should do the following:
|
||||
-- 1. render 10 cleanup schedules as dead
|
||||
-- 2. 1 schedule as ready to be delivered
|
||||
@ -201,7 +201,7 @@ eventTriggerLogCleanupSpec sourceConfig = do
|
||||
-- run the setup
|
||||
liftIO setup
|
||||
-- add some cleanup schedules
|
||||
liftIO $ runExceptQErr $ addCleanupSchedules sourceConfig [(triggerName, autoTriggerCleanupConfig)]
|
||||
liftIO $ runExceptQErr $ addCleanupSchedules sourceConfig $ pure (triggerName, autoTriggerCleanupConfig)
|
||||
-- move 1 minute into the future
|
||||
runSQLQuery $ reduceScheduledAtBy triggerName 1
|
||||
-- get cleanup actions to deliver
|
||||
@ -221,7 +221,7 @@ eventTriggerLogCleanupSpec sourceConfig = do
|
||||
-- run the setup
|
||||
liftIO setup
|
||||
-- add some cleanup schedules
|
||||
liftIO $ runExceptQErr $ addCleanupSchedules sourceConfig [(triggerName, autoTriggerCleanupConfig)]
|
||||
liftIO $ runExceptQErr $ addCleanupSchedules sourceConfig $ pure (triggerName, autoTriggerCleanupConfig)
|
||||
-- move 1 minute into the future
|
||||
runSQLQuery $ reduceScheduledAtBy triggerName 1
|
||||
-- get cleanup actions to deliver
|
||||
|
Loading…
Reference in New Issue
Block a user