From 97b1155bf83bbed223c763c681b88abfac5521e0 Mon Sep 17 00:00:00 2001 From: Karthikeyan Chinnakonda Date: Thu, 2 Jul 2020 17:27:09 +0530 Subject: [PATCH] server: unlock scheduled events on graceful shutdown (#4928) --- CHANGELOG.md | 1 + server/graphql-engine.cabal | 1 + server/src-lib/Hasura/App.hs | 75 +++++++++++---- server/src-lib/Hasura/Eventing/Common.hs | 40 ++++++++ .../src-lib/Hasura/Eventing/EventTrigger.hs | 33 ++----- .../Hasura/Eventing/ScheduledTrigger.hs | 93 +++++++++++++++++-- .../Hasura/RQL/Types/ScheduledTrigger.hs | 6 ++ 7 files changed, 193 insertions(+), 56 deletions(-) create mode 100644 server/src-lib/Hasura/Eventing/Common.hs diff --git a/CHANGELOG.md b/CHANGELOG.md index bfa5051d5ed..f99cdf5002a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - server: add new `--conn-lifetime` and `HASURA_GRAPHQL_PG_CONN_LIFETIME` options for expiring connections after some amount of active time (#5087) - server: shrink libpq connection request/response buffers back to 1MB if they grow beyond 2MB, fixing leak-like behavior on active servers (#5087) +- server: unlock locked scheduled events on graceful shutdown (#4928) - server: disable prepared statements for mutations as we end up with single-use objects which result in excessive memory consumption for mutation heavy workloads (#5255) - console: allow configuring statement timeout on console RawSQL page (close #4998) (#5045) - console: support tracking partitioned tables (close #5071) (#5258) diff --git a/server/graphql-engine.cabal b/server/graphql-engine.cabal index 4417cd7066f..6fc4cc869a7 100644 --- a/server/graphql-engine.cabal +++ b/server/graphql-engine.cabal @@ -404,6 +404,7 @@ library , Hasura.Eventing.HTTP , Hasura.Eventing.EventTrigger , Hasura.Eventing.ScheduledTrigger + , Hasura.Eventing.Common , Control.Lens.Extended , Data.Aeson.Extended diff --git a/server/src-lib/Hasura/App.hs b/server/src-lib/Hasura/App.hs index 7bd77287828..0f6e861dd7b 100644 --- a/server/src-lib/Hasura/App.hs +++ b/server/src-lib/Hasura/App.hs @@ -2,7 +2,7 @@ module Hasura.App where -import Control.Concurrent.STM.TVar (readTVarIO) +import Control.Concurrent.STM.TVar (readTVarIO,TVar) import Control.Lens (view, _2) import Control.Monad.Base import Control.Monad.Catch (MonadCatch, MonadThrow, onException) @@ -36,6 +36,7 @@ import qualified Text.Mustache.Compile as M import Hasura.Db import Hasura.EncJSON import Hasura.Eventing.EventTrigger +import Hasura.Eventing.Common import Hasura.Eventing.ScheduledTrigger import Hasura.GraphQL.Execute (MonadGQLExecutionCheck (..), checkQueryInAllowlist) @@ -318,13 +319,15 @@ runHGEServer ServeOptions{..} InitCtx{..} pgExecCtx initTime = do getFromEnv (Milliseconds defaultFetchInterval) "HASURA_GRAPHQL_EVENTS_FETCH_INTERVAL" logEnvHeaders <- liftIO $ getFromEnv False "LOG_HEADERS_FROM_ENV" + lockedEventsCtx <- liftIO $ atomically $ initLockedEventsCtx + -- prepare event triggers data prepareEvents _icPgPool logger eventEngineCtx <- liftIO $ atomically $ initEventEngineCtx maxEvThrds fetchI unLogger logger $ mkGenericStrLog LevelInfo "event_triggers" "starting workers" _eventQueueThread <- C.forkImmortal "processEventQueue" logger $ liftIO $ processEventQueue logger logEnvHeaders - _icHttpManager _icPgPool (getSCFromRef cacheRef) eventEngineCtx + _icHttpManager _icPgPool (getSCFromRef cacheRef) eventEngineCtx lockedEventsCtx -- start a backgroud thread to handle async actions _asyncActionsThread <- C.forkImmortal "asyncActionsProcessor" logger $ liftIO $ @@ -334,9 +337,11 @@ runHGEServer ServeOptions{..} InitCtx{..} pgExecCtx initTime = do void $ liftIO $ C.forkImmortal "runCronEventsGenerator" logger $ runCronEventsGenerator logger _icPgPool (getSCFromRef cacheRef) + -- prepare scheduled triggers + prepareScheduledEvents _icPgPool logger + -- start a background thread to deliver the scheduled events - void $ liftIO $ C.forkImmortal "processScheduledTriggers" logger $ - processScheduledTriggers logger logEnvHeaders _icHttpManager _icPgPool (getSCFromRef cacheRef) + void $ liftIO $ C.forkImmortal "processScheduledTriggers" logger $ processScheduledTriggers logger logEnvHeaders _icHttpManager _icPgPool (getSCFromRef cacheRef) lockedEventsCtx -- start a background thread to check for updates _updateThread <- C.forkImmortal "checkForUpdates" logger $ liftIO $ @@ -357,7 +362,7 @@ runHGEServer ServeOptions{..} InitCtx{..} pgExecCtx initTime = do let warpSettings = Warp.setPort soPort . Warp.setHost soHost . Warp.setGracefulShutdownTimeout (Just 30) -- 30s graceful shutdown - . Warp.setInstallShutdownHandler (shutdownHandler _icLoggers shutdownApp eventEngineCtx _icPgPool) + . Warp.setInstallShutdownHandler (shutdownHandler _icLoggers shutdownApp lockedEventsCtx _icPgPool) $ Warp.defaultSettings liftIO $ Warp.runSettings warpSettings app @@ -378,23 +383,47 @@ runHGEServer ServeOptions{..} InitCtx{..} pgExecCtx initTime = do res <- liftIO $ runTx pool (Q.ReadCommitted, Nothing) unlockAllEvents either printErrJExit return res + -- | prepareScheduledEvents is like prepareEvents, but for scheduled triggers + prepareScheduledEvents pool (Logger logger) = do + liftIO $ logger $ mkGenericStrLog LevelInfo "scheduled_triggers" "preparing data" + res <- liftIO $ runTx pool (Q.ReadCommitted, Nothing) unlockAllLockedScheduledEvents + either printErrJExit return res + -- | shutdownEvents will be triggered when a graceful shutdown has been inititiated, it will - -- get the locked events from the event engine context and then it will unlock all those events. + -- get the locked events from the event engine context and the scheduled event engine context + -- then it will unlock all those events. -- It may happen that an event may be processed more than one time, an event that has been already -- processed but not been marked as delivered in the db will be unlocked by `shutdownEvents` -- and will be processed when the events are proccessed next time. - shutdownEvents :: Q.PGPool -> Logger Hasura -> EventEngineCtx -> IO () - shutdownEvents pool (Logger logger) EventEngineCtx {..} = do + shutdownEvents + :: Q.PGPool + -> Logger Hasura + -> LockedEventsCtx + -> IO () + shutdownEvents pool hasuraLogger@(Logger logger) LockedEventsCtx {..} = do liftIO $ logger $ mkGenericStrLog LevelInfo "event_triggers" "unlocking events that are locked by the HGE" - lockedEvents <- readTVarIO _eeCtxLockedEvents - liftIO $ do - when (not $ Set.null $ lockedEvents) $ do - res <- runTx pool (Q.ReadCommitted, Nothing) (unlockEvents $ toList lockedEvents) - case res of - Left err -> logger $ mkGenericStrLog - LevelWarn "event_triggers" ("Error in unlocking the events " ++ (show err)) - Right count -> logger $ mkGenericStrLog - LevelInfo "event_triggers" ((show count) ++ " events were updated") + unlockEventsForShutdown pool hasuraLogger "event_triggers" "" unlockEvents leEvents + liftIO $ logger $ mkGenericStrLog LevelInfo "scheduled_triggers" "unlocking scheduled events that are locked by the HGE" + unlockEventsForShutdown pool hasuraLogger "scheduled_triggers" "cron events" unlockCronEvents leCronEvents + unlockEventsForShutdown pool hasuraLogger "scheduled_triggers" "scheduled events" unlockCronEvents leStandAloneEvents + + unlockEventsForShutdown + :: Q.PGPool + -> Logger Hasura + -> Text -- ^ trigger type + -> Text -- ^ event type + -> ([eventId] -> Q.TxE QErr Int) + -> TVar (Set.Set eventId) + -> IO () + unlockEventsForShutdown pool (Logger logger) triggerType eventType doUnlock lockedIdsVar = do + lockedIds <- readTVarIO lockedIdsVar + unless (Set.null lockedIds) do + result <- runTx pool (Q.ReadCommitted, Nothing) (doUnlock $ toList lockedIds) + case result of + Left err -> logger $ mkGenericStrLog LevelWarn triggerType $ + "Error while unlocking " ++ T.unpack eventType ++ " events: " ++ show err + Right count -> logger $ mkGenericStrLog LevelInfo triggerType $ + show count ++ " " ++ T.unpack eventType ++ " events successfully unlocked" getFromEnv :: (Read a) => a -> String -> IO a getFromEnv defaults env = do @@ -413,12 +442,18 @@ runHGEServer ServeOptions{..} InitCtx{..} pgExecCtx initTime = do -- shuts down the server and associated resources. -- Structuring things this way lets us decide elsewhere exactly how -- we want to control shutdown. - shutdownHandler :: Loggers -> IO () -> EventEngineCtx -> Q.PGPool -> IO () -> IO () - shutdownHandler (Loggers loggerCtx (Logger logger) _) shutdownApp eeCtx pool closeSocket = + shutdownHandler + :: Loggers + -> IO () + -> LockedEventsCtx + -> Q.PGPool + -> IO () + -> IO () + shutdownHandler (Loggers loggerCtx (Logger logger) _) shutdownApp leCtx pool closeSocket = void . Async.async $ do waitForShutdown _icShutdownLatch logger $ mkGenericStrLog LevelInfo "server" "gracefully shutting down server" - shutdownEvents pool (Logger logger) eeCtx + shutdownEvents pool (Logger logger) leCtx closeSocket shutdownApp cleanLoggerCtx loggerCtx diff --git a/server/src-lib/Hasura/Eventing/Common.hs b/server/src-lib/Hasura/Eventing/Common.hs new file mode 100644 index 00000000000..4ba94d2106d --- /dev/null +++ b/server/src-lib/Hasura/Eventing/Common.hs @@ -0,0 +1,40 @@ +module Hasura.Eventing.Common where + +import Hasura.Prelude +import Control.Concurrent.STM.TVar +import Control.Monad.STM +import Hasura.RQL.Types.EventTrigger (EventId) +import Hasura.RQL.Types.ScheduledTrigger (CronEventId,StandAloneScheduledEventId) + +import qualified Data.Set as Set + +data LockedEventsCtx + = LockedEventsCtx + { leCronEvents :: TVar (Set.Set CronEventId) + , leStandAloneEvents :: TVar (Set.Set StandAloneScheduledEventId) + , leEvents :: TVar (Set.Set EventId) + } + +initLockedEventsCtx :: STM LockedEventsCtx +initLockedEventsCtx = do + leCronEvents <- newTVar Set.empty + leStandAloneEvents <- newTVar Set.empty + leEvents <- newTVar Set.empty + return $ LockedEventsCtx{..} + +-- | After the events are fetched from the DB, we store the locked events +-- in a hash set(order doesn't matter and look ups are faster) in the +-- event engine context +saveLockedEvents :: [Text] -> TVar (Set.Set Text) -> IO () +saveLockedEvents eventIds lockedEvents = + atomically $ do + lockedEventsVals <- readTVar lockedEvents + writeTVar lockedEvents $! + Set.union lockedEventsVals $ Set.fromList eventIds + +-- | Remove an event from the 'LockedEventsCtx' after it has been processed +removeEventFromLockedEvents :: Text -> TVar (Set.Set Text) -> IO () +removeEventFromLockedEvents eventId lockedEvents = + atomically $ do + lockedEventsVals <- readTVar lockedEvents + writeTVar lockedEvents $! Set.delete eventId lockedEventsVals diff --git a/server/src-lib/Hasura/Eventing/EventTrigger.hs b/server/src-lib/Hasura/Eventing/EventTrigger.hs index 8df24931956..1a018ebe5b6 100644 --- a/server/src-lib/Hasura/Eventing/EventTrigger.hs +++ b/server/src-lib/Hasura/Eventing/EventTrigger.hs @@ -55,6 +55,7 @@ import Data.String import Data.Time.Clock import Data.Word import Hasura.Eventing.HTTP +import Hasura.Eventing.Common import Hasura.HTTP import Hasura.Prelude import Hasura.RQL.DDL.Headers @@ -71,7 +72,6 @@ import qualified Hasura.Logging as L import qualified Network.HTTP.Client as HTTP import qualified Database.PG.Query.PTI as PTI import qualified PostgreSQL.Binary.Encoding as PE -import qualified Data.Set as Set data TriggerMetadata = TriggerMetadata { tmName :: TriggerName } @@ -105,7 +105,6 @@ data EventEngineCtx = EventEngineCtx { _eeCtxEventThreadsCapacity :: TVar Int , _eeCtxFetchInterval :: DiffTime - , _eeCtxLockedEvents :: TVar (Set.Set EventId) } data DeliveryInfo @@ -147,7 +146,6 @@ defaultFetchInterval = seconds 1 initEventEngineCtx :: Int -> DiffTime -> STM EventEngineCtx initEventEngineCtx maxT _eeCtxFetchInterval = do _eeCtxEventThreadsCapacity <- newTVar maxT - _eeCtxLockedEvents <- newTVar Set.empty return $ EventEngineCtx{..} -- | Service events from our in-DB queue. @@ -161,9 +159,9 @@ initEventEngineCtx maxT _eeCtxFetchInterval = do -- - limit webhook HTTP concurrency per HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE processEventQueue :: (HasVersion) => L.Logger L.Hasura -> LogEnvHeaders -> HTTP.Manager-> Q.PGPool - -> IO SchemaCache -> EventEngineCtx + -> IO SchemaCache -> EventEngineCtx -> LockedEventsCtx -> IO void -processEventQueue logger logenv httpMgr pool getSchemaCache eeCtx@EventEngineCtx{..} = do +processEventQueue logger logenv httpMgr pool getSchemaCache eeCtx@EventEngineCtx{..} LockedEventsCtx {leEvents}= do events0 <- popEventsBatch go events0 0 False where @@ -175,26 +173,9 @@ processEventQueue logger logenv httpMgr pool getSchemaCache eeCtx@EventEngineCtx L.unLogger logger $ EventInternalErr err return [] Right events -> do - saveLockedEvents events + saveLockedEvents (map eId events) leEvents return events - -- After the events are fetched from the DB, we store the locked events - -- in a hash set(order doesn't matter and look ups are faster) in the - -- event engine context - saveLockedEvents :: [Event] -> IO () - saveLockedEvents evts = - liftIO $ atomically $ do - lockedEvents <- readTVar _eeCtxLockedEvents - let evtsIds = map eId evts - let newLockedEvents = Set.union lockedEvents (Set.fromList evtsIds) - writeTVar _eeCtxLockedEvents $! newLockedEvents - - removeEventFromLockedEvents :: EventId -> IO () - removeEventFromLockedEvents eventId = do - liftIO $ atomically $ do - lockedEvents <- readTVar _eeCtxLockedEvents - writeTVar _eeCtxLockedEvents $! Set.delete eventId lockedEvents - -- work on this batch of events while prefetching the next. Recurse after we've forked workers -- for each in the batch, minding the requested pool size. go :: [Event] -> Int -> Bool -> IO void @@ -211,7 +192,7 @@ processEventQueue logger logenv httpMgr pool getSchemaCache eeCtx@EventEngineCtx runReaderT (withEventEngineCtx eeCtx $ (processEvent event)) (logger, httpMgr) -- removing an event from the _eeCtxLockedEvents after the event has -- been processed - removeEventFromLockedEvents (eId event) + removeEventFromLockedEvents (eId event) leEvents wait eventsNextA let lenEvents = length events @@ -282,10 +263,10 @@ withEventEngineCtx :: withEventEngineCtx eeCtx = bracket_ (decrementThreadCount eeCtx) (incrementThreadCount eeCtx) incrementThreadCount :: MonadIO m => EventEngineCtx -> m () -incrementThreadCount (EventEngineCtx c _ _) = liftIO $ atomically $ modifyTVar' c (+1) +incrementThreadCount (EventEngineCtx c _) = liftIO $ atomically $ modifyTVar' c (+1) decrementThreadCount :: MonadIO m => EventEngineCtx -> m () -decrementThreadCount (EventEngineCtx c _ _) = liftIO $ atomically $ do +decrementThreadCount (EventEngineCtx c _) = liftIO $ atomically $ do countThreads <- readTVar c if countThreads > 0 then modifyTVar' c (\v -> v - 1) diff --git a/server/src-lib/Hasura/Eventing/ScheduledTrigger.hs b/server/src-lib/Hasura/Eventing/ScheduledTrigger.hs index 96859e3d600..a5342d57f3f 100644 --- a/server/src-lib/Hasura/Eventing/ScheduledTrigger.hs +++ b/server/src-lib/Hasura/Eventing/ScheduledTrigger.hs @@ -66,10 +66,16 @@ module Hasura.Eventing.ScheduledTrigger , generateScheduleTimes , insertCronEvents , StandAloneScheduledEvent(..) + , initLockedEventsCtx + , LockedEventsCtx(..) + , unlockCronEvents + , unlockStandaloneScheduledEvents + , unlockAllLockedScheduledEvents ) where import Control.Arrow.Extended (dup) import Control.Concurrent.Extended (sleep) +import Control.Concurrent.STM.TVar import Data.Has import Data.Int (Int64) import Data.List (unfoldr) @@ -83,6 +89,7 @@ import Hasura.Server.Version (HasVersion) import Hasura.RQL.DDL.EventTrigger (getHeaderInfosFromConf) import Hasura.SQL.DML import Hasura.SQL.Types +import Hasura.Eventing.Common import System.Cron import qualified Data.Aeson as J @@ -96,6 +103,10 @@ import qualified Hasura.Logging as L import qualified Network.HTTP.Client as HTTP import qualified Text.Builder as TB (run) import qualified PostgreSQL.Binary.Decoding as PD +import qualified Data.Set as Set +import qualified Database.PG.Query.PTI as PTI +import qualified PostgreSQL.Binary.Encoding as PE + newtype ScheduledTriggerInternalErr = ScheduledTriggerInternalErr QErr @@ -141,6 +152,8 @@ instance Q.FromCol ScheduledEventStatus where instance J.ToJSON ScheduledEventStatus where toJSON = J.String . scheduledEventStatusToText +type ScheduledEventId = Text + data CronTriggerStats = CronTriggerStats { ctsName :: !TriggerName @@ -156,7 +169,7 @@ data CronEventSeed data CronEventPartial = CronEventPartial - { cepId :: !Text + { cepId :: !CronEventId , cepName :: !TriggerName , cepScheduledTime :: !UTCTime , cepTries :: !Int @@ -164,7 +177,7 @@ data CronEventPartial data ScheduledEventFull = ScheduledEventFull - { sefId :: !Text + { sefId :: !ScheduledEventId , sefName :: !(Maybe TriggerName) -- ^ sefName is the name of the cron trigger. -- A standalone scheduled event is not associated with a name, so in that @@ -182,7 +195,7 @@ $(J.deriveToJSON (J.aesonDrop 3 J.snakeCase) {J.omitNothingFields = True} ''Sche data StandAloneScheduledEvent = StandAloneScheduledEvent - { saseId :: !Text + { saseId :: !StandAloneScheduledEventId , saseScheduledTime :: !UTCTime , saseTries :: !Int , saseWebhook :: !InputWebhook @@ -318,14 +331,19 @@ processCronEvents -> HTTP.Manager -> Q.PGPool -> IO SchemaCache + -> TVar (Set.Set CronEventId) -> IO () -processCronEvents logger logEnv httpMgr pgpool getSC = do +processCronEvents logger logEnv httpMgr pgpool getSC lockedCronEvents = do cronTriggersInfo <- scCronTriggers <$> getSC cronScheduledEvents <- runExceptT $ Q.runTx pgpool (Q.ReadCommitted, Just Q.ReadWrite) getPartialCronEvents case cronScheduledEvents of - Right partialEvents -> + Right partialEvents -> do + -- save the locked standalone events that have been fetched from the + -- database, the events stored here will be unlocked in case a + -- graceful shutdown is initiated in midst of processing these events + saveLockedEvents (map cepId partialEvents) lockedCronEvents for_ partialEvents $ \(CronEventPartial id' name st tries)-> do case Map.lookup name cronTriggersInfo of Nothing -> logInternalError $ @@ -345,6 +363,7 @@ processCronEvents logger logEnv httpMgr pgpool getSC = do ctiComment finally <- runExceptT $ runReaderT (processScheduledEvent logEnv pgpool scheduledEvent CronScheduledEvent) (logger, httpMgr) + removeEventFromLockedEvents id' lockedCronEvents either logInternalError pure finally Left err -> logInternalError err where @@ -356,13 +375,18 @@ processStandAloneEvents -> LogEnvHeaders -> HTTP.Manager -> Q.PGPool + -> TVar (Set.Set StandAloneScheduledEventId) -> IO () -processStandAloneEvents logger logEnv httpMgr pgpool = do +processStandAloneEvents logger logEnv httpMgr pgpool lockedStandAloneEvents = do standAloneScheduledEvents <- runExceptT $ Q.runTx pgpool (Q.ReadCommitted, Just Q.ReadWrite) getOneOffScheduledEvents case standAloneScheduledEvents of - Right standAloneScheduledEvents' -> + Right standAloneScheduledEvents' -> do + -- save the locked standalone events that have been fetched from the + -- database, the events stored here will be unlocked in case a + -- graceful shutdown is initiated in midst of processing these events + saveLockedEvents (map saseId standAloneScheduledEvents') lockedStandAloneEvents for_ standAloneScheduledEvents' $ \(StandAloneScheduledEvent id' scheduledTime @@ -394,6 +418,7 @@ processStandAloneEvents logger logEnv httpMgr pgpool = do finally <- runExceptT $ runReaderT (processScheduledEvent logEnv pgpool scheduledEvent StandAloneEvent) $ (logger, httpMgr) + removeEventFromLockedEvents id' lockedStandAloneEvents either logInternalError pure finally Left headerInfoErr -> logInternalError headerInfoErr @@ -411,11 +436,12 @@ processScheduledTriggers -> HTTP.Manager -> Q.PGPool -> IO SchemaCache + -> LockedEventsCtx -> IO void -processScheduledTriggers logger logEnv httpMgr pgpool getSC= +processScheduledTriggers logger logEnv httpMgr pgpool getSC LockedEventsCtx {..} = forever $ do - processCronEvents logger logEnv httpMgr pgpool getSC - processStandAloneEvents logger logEnv httpMgr pgpool + processCronEvents logger logEnv httpMgr pgpool getSC leCronEvents + processStandAloneEvents logger logEnv httpMgr pgpool leStandAloneEvents sleep (minutes 1) processScheduledEvent :: @@ -684,3 +710,50 @@ getOneOffScheduledEvents = do liftExceptTIO :: (MonadError e m, MonadIO m) => ExceptT e IO a -> m a liftExceptTIO m = liftEither =<< liftIO (runExceptT m) + +newtype ScheduledEventIdArray = + ScheduledEventIdArray { unScheduledEventIdArray :: [ScheduledEventId]} + deriving (Show, Eq) + +instance Q.ToPrepArg ScheduledEventIdArray where + toPrepVal (ScheduledEventIdArray l) = Q.toPrepValHelper PTI.unknown encoder $ l + where + -- 25 is the OID value of TEXT, https://jdbc.postgresql.org/development/privateapi/constant-values.html + encoder = PE.array 25 . PE.dimensionArray foldl' (PE.encodingArray . PE.text_strict) + +unlockCronEvents :: [ScheduledEventId] -> Q.TxE QErr Int +unlockCronEvents scheduledEventIds = + (runIdentity . Q.getRow) <$> Q.withQE defaultTxErrorHandler + [Q.sql| + WITH "cte" AS + (UPDATE hdb_catalog.hdb_cron_events + SET status = 'scheduled' + WHERE id = ANY($1::text[]) and status = 'locked' + RETURNING *) + SELECT count(*) FROM "cte" + |] (Identity $ ScheduledEventIdArray scheduledEventIds) True + +unlockStandaloneScheduledEvents :: [ScheduledEventId] -> Q.TxE QErr Int +unlockStandaloneScheduledEvents scheduledEventIds = + (runIdentity . Q.getRow) <$> Q.withQE defaultTxErrorHandler + [Q.sql| + WITH "cte" AS + (UPDATE hdb_catalog.hdb_scheduled_events + SET status = 'scheduled' + WHERE id = ANY($1::text[]) AND status = 'locked' + RETURNING *) + SELECT count(*) FROM "cte" + |] (Identity $ ScheduledEventIdArray scheduledEventIds) True + +unlockAllLockedScheduledEvents :: Q.TxE QErr () +unlockAllLockedScheduledEvents = do + Q.unitQE defaultTxErrorHandler [Q.sql| + UPDATE hdb_catalog.hdb_cron_events + SET status = 'scheduled' + WHERE status = 'locked' + |] () True + Q.unitQE defaultTxErrorHandler [Q.sql| + UPDATE hdb_catalog.hdb_scheduled_events + SET status = 'scheduled' + WHERE status = 'locked' + |] () True diff --git a/server/src-lib/Hasura/RQL/Types/ScheduledTrigger.hs b/server/src-lib/Hasura/RQL/Types/ScheduledTrigger.hs index 9b306220a5d..acb5b82442f 100644 --- a/server/src-lib/Hasura/RQL/Types/ScheduledTrigger.hs +++ b/server/src-lib/Hasura/RQL/Types/ScheduledTrigger.hs @@ -5,6 +5,8 @@ module Hasura.RQL.Types.ScheduledTrigger , CreateCronTrigger(..) , STRetryConf(..) , CreateScheduledEvent(..) + , CronEventId + , StandAloneScheduledEventId , formatTime' , defaultSTRetryConf ) where @@ -25,6 +27,10 @@ import qualified Data.Aeson as J import qualified Data.Text as T import qualified Hasura.RQL.Types.EventTrigger as ET +type CronEventId = Text + +type StandAloneScheduledEventId = Text + data STRetryConf = STRetryConf { strcNumRetries :: !Int