mirror of
synced 2024-12-14 08:02:15 +03:00
server: unlock scheduled events on graceful shutdown (#4928)
This commit is contained in:
@ -8,6 +8,7 @@
- server: add new `--conn-lifetime` and `HASURA_GRAPHQL_PG_CONN_LIFETIME` options for expiring connections after some amount of active time (#5087)
- server: shrink libpq connection request/response buffers back to 1MB if they grow beyond 2MB, fixing leak-like behavior on active servers (#5087)
- server: unlock locked scheduled events on graceful shutdown (#4928)
- server: disable prepared statements for mutations as we end up with single-use objects which result in excessive memory consumption for mutation heavy workloads (#5255)
- console: allow configuring statement timeout on console RawSQL page (close #4998) (#5045)
- console: support tracking partitioned tables (close #5071) (#5258)
@ -404,6 +404,7 @@ library
, Hasura.Eventing.HTTP
, Hasura.Eventing.EventTrigger
, Hasura.Eventing.ScheduledTrigger
, Hasura.Eventing.Common
, Control.Lens.Extended
, Data.Aeson.Extended
@ -2,7 +2,7 @@
module Hasura.App where
import Control.Concurrent.STM.TVar (readTVarIO)
import Control.Concurrent.STM.TVar (readTVarIO,TVar)
import Control.Lens (view, _2)
import Control.Monad.Base
import Control.Monad.Catch (MonadCatch, MonadThrow, onException)
@ -36,6 +36,7 @@ import qualified Text.Mustache.Compile as M
import Hasura.Db
import Hasura.EncJSON
import Hasura.Eventing.EventTrigger
import Hasura.Eventing.Common
import Hasura.Eventing.ScheduledTrigger
import Hasura.GraphQL.Execute (MonadGQLExecutionCheck (..),
@ -318,13 +319,15 @@ runHGEServer ServeOptions{..} InitCtx{..} pgExecCtx initTime = do
getFromEnv (Milliseconds defaultFetchInterval) "HASURA_GRAPHQL_EVENTS_FETCH_INTERVAL"
logEnvHeaders <- liftIO $ getFromEnv False "LOG_HEADERS_FROM_ENV"
lockedEventsCtx <- liftIO $ atomically $ initLockedEventsCtx
-- prepare event triggers data
prepareEvents _icPgPool logger
eventEngineCtx <- liftIO $ atomically $ initEventEngineCtx maxEvThrds fetchI
unLogger logger $ mkGenericStrLog LevelInfo "event_triggers" "starting workers"
_eventQueueThread <- C.forkImmortal "processEventQueue" logger $ liftIO $
processEventQueue logger logEnvHeaders
_icHttpManager _icPgPool (getSCFromRef cacheRef) eventEngineCtx
_icHttpManager _icPgPool (getSCFromRef cacheRef) eventEngineCtx lockedEventsCtx
-- start a backgroud thread to handle async actions
_asyncActionsThread <- C.forkImmortal "asyncActionsProcessor" logger $ liftIO $
@ -334,9 +337,11 @@ runHGEServer ServeOptions{..} InitCtx{..} pgExecCtx initTime = do
void $ liftIO $ C.forkImmortal "runCronEventsGenerator" logger $
runCronEventsGenerator logger _icPgPool (getSCFromRef cacheRef)
-- prepare scheduled triggers
prepareScheduledEvents _icPgPool logger
-- start a background thread to deliver the scheduled events
void $ liftIO $ C.forkImmortal "processScheduledTriggers" logger $
processScheduledTriggers logger logEnvHeaders _icHttpManager _icPgPool (getSCFromRef cacheRef)
void $ liftIO $ C.forkImmortal "processScheduledTriggers" logger $ processScheduledTriggers logger logEnvHeaders _icHttpManager _icPgPool (getSCFromRef cacheRef) lockedEventsCtx
-- start a background thread to check for updates
_updateThread <- C.forkImmortal "checkForUpdates" logger $ liftIO $
@ -357,7 +362,7 @@ runHGEServer ServeOptions{..} InitCtx{..} pgExecCtx initTime = do
let warpSettings = Warp.setPort soPort
. Warp.setHost soHost
. Warp.setGracefulShutdownTimeout (Just 30) -- 30s graceful shutdown
. Warp.setInstallShutdownHandler (shutdownHandler _icLoggers shutdownApp eventEngineCtx _icPgPool)
. Warp.setInstallShutdownHandler (shutdownHandler _icLoggers shutdownApp lockedEventsCtx _icPgPool)
$ Warp.defaultSettings
liftIO $ Warp.runSettings warpSettings app
@ -378,23 +383,47 @@ runHGEServer ServeOptions{..} InitCtx{..} pgExecCtx initTime = do
res <- liftIO $ runTx pool (Q.ReadCommitted, Nothing) unlockAllEvents
either printErrJExit return res
-- | prepareScheduledEvents is like prepareEvents, but for scheduled triggers
prepareScheduledEvents pool (Logger logger) = do
liftIO $ logger $ mkGenericStrLog LevelInfo "scheduled_triggers" "preparing data"
res <- liftIO $ runTx pool (Q.ReadCommitted, Nothing) unlockAllLockedScheduledEvents
either printErrJExit return res
-- | shutdownEvents will be triggered when a graceful shutdown has been inititiated, it will
-- get the locked events from the event engine context and then it will unlock all those events.
-- get the locked events from the event engine context and the scheduled event engine context
-- then it will unlock all those events.
-- It may happen that an event may be processed more than one time, an event that has been already
-- processed but not been marked as delivered in the db will be unlocked by `shutdownEvents`
-- and will be processed when the events are proccessed next time.
shutdownEvents :: Q.PGPool -> Logger Hasura -> EventEngineCtx -> IO ()
shutdownEvents pool (Logger logger) EventEngineCtx {..} = do
:: Q.PGPool
-> Logger Hasura
-> LockedEventsCtx
-> IO ()
shutdownEvents pool hasuraLogger@(Logger logger) LockedEventsCtx {..} = do
liftIO $ logger $ mkGenericStrLog LevelInfo "event_triggers" "unlocking events that are locked by the HGE"
lockedEvents <- readTVarIO _eeCtxLockedEvents
liftIO $ do
when (not $ Set.null $ lockedEvents) $ do
res <- runTx pool (Q.ReadCommitted, Nothing) (unlockEvents $ toList lockedEvents)
case res of
Left err -> logger $ mkGenericStrLog
LevelWarn "event_triggers" ("Error in unlocking the events " ++ (show err))
Right count -> logger $ mkGenericStrLog
LevelInfo "event_triggers" ((show count) ++ " events were updated")
unlockEventsForShutdown pool hasuraLogger "event_triggers" "" unlockEvents leEvents
liftIO $ logger $ mkGenericStrLog LevelInfo "scheduled_triggers" "unlocking scheduled events that are locked by the HGE"
unlockEventsForShutdown pool hasuraLogger "scheduled_triggers" "cron events" unlockCronEvents leCronEvents
unlockEventsForShutdown pool hasuraLogger "scheduled_triggers" "scheduled events" unlockCronEvents leStandAloneEvents
:: Q.PGPool
-> Logger Hasura
-> Text -- ^ trigger type
-> Text -- ^ event type
-> ([eventId] -> Q.TxE QErr Int)
-> TVar (Set.Set eventId)
-> IO ()
unlockEventsForShutdown pool (Logger logger) triggerType eventType doUnlock lockedIdsVar = do
lockedIds <- readTVarIO lockedIdsVar
unless (Set.null lockedIds) do
result <- runTx pool (Q.ReadCommitted, Nothing) (doUnlock $ toList lockedIds)
case result of
Left err -> logger $ mkGenericStrLog LevelWarn triggerType $
"Error while unlocking " ++ T.unpack eventType ++ " events: " ++ show err
Right count -> logger $ mkGenericStrLog LevelInfo triggerType $
show count ++ " " ++ T.unpack eventType ++ " events successfully unlocked"
getFromEnv :: (Read a) => a -> String -> IO a
getFromEnv defaults env = do
@ -413,12 +442,18 @@ runHGEServer ServeOptions{..} InitCtx{..} pgExecCtx initTime = do
-- shuts down the server and associated resources.
-- Structuring things this way lets us decide elsewhere exactly how
-- we want to control shutdown.
shutdownHandler :: Loggers -> IO () -> EventEngineCtx -> Q.PGPool -> IO () -> IO ()
shutdownHandler (Loggers loggerCtx (Logger logger) _) shutdownApp eeCtx pool closeSocket =
:: Loggers
-> IO ()
-> LockedEventsCtx
-> Q.PGPool
-> IO ()
-> IO ()
shutdownHandler (Loggers loggerCtx (Logger logger) _) shutdownApp leCtx pool closeSocket =
void . Async.async $ do
waitForShutdown _icShutdownLatch
logger $ mkGenericStrLog LevelInfo "server" "gracefully shutting down server"
shutdownEvents pool (Logger logger) eeCtx
shutdownEvents pool (Logger logger) leCtx
cleanLoggerCtx loggerCtx
Normal file
Normal file
@ -0,0 +1,40 @@
module Hasura.Eventing.Common where
import Hasura.Prelude
import Control.Concurrent.STM.TVar
import Control.Monad.STM
import Hasura.RQL.Types.EventTrigger (EventId)
import Hasura.RQL.Types.ScheduledTrigger (CronEventId,StandAloneScheduledEventId)
import qualified Data.Set as Set
data LockedEventsCtx
= LockedEventsCtx
{ leCronEvents :: TVar (Set.Set CronEventId)
, leStandAloneEvents :: TVar (Set.Set StandAloneScheduledEventId)
, leEvents :: TVar (Set.Set EventId)
initLockedEventsCtx :: STM LockedEventsCtx
initLockedEventsCtx = do
leCronEvents <- newTVar Set.empty
leStandAloneEvents <- newTVar Set.empty
leEvents <- newTVar Set.empty
return $ LockedEventsCtx{..}
-- | After the events are fetched from the DB, we store the locked events
-- in a hash set(order doesn't matter and look ups are faster) in the
-- event engine context
saveLockedEvents :: [Text] -> TVar (Set.Set Text) -> IO ()
saveLockedEvents eventIds lockedEvents =
atomically $ do
lockedEventsVals <- readTVar lockedEvents
writeTVar lockedEvents $!
Set.union lockedEventsVals $ Set.fromList eventIds
-- | Remove an event from the 'LockedEventsCtx' after it has been processed
removeEventFromLockedEvents :: Text -> TVar (Set.Set Text) -> IO ()
removeEventFromLockedEvents eventId lockedEvents =
atomically $ do
lockedEventsVals <- readTVar lockedEvents
writeTVar lockedEvents $! Set.delete eventId lockedEventsVals
@ -55,6 +55,7 @@ import Data.String
import Data.Time.Clock
import Data.Word
import Hasura.Eventing.HTTP
import Hasura.Eventing.Common
import Hasura.HTTP
import Hasura.Prelude
import Hasura.RQL.DDL.Headers
@ -71,7 +72,6 @@ import qualified Hasura.Logging as L
import qualified Network.HTTP.Client as HTTP
import qualified Database.PG.Query.PTI as PTI
import qualified PostgreSQL.Binary.Encoding as PE
import qualified Data.Set as Set
data TriggerMetadata
= TriggerMetadata { tmName :: TriggerName }
@ -105,7 +105,6 @@ data EventEngineCtx
= EventEngineCtx
{ _eeCtxEventThreadsCapacity :: TVar Int
, _eeCtxFetchInterval :: DiffTime
, _eeCtxLockedEvents :: TVar (Set.Set EventId)
data DeliveryInfo
@ -147,7 +146,6 @@ defaultFetchInterval = seconds 1
initEventEngineCtx :: Int -> DiffTime -> STM EventEngineCtx
initEventEngineCtx maxT _eeCtxFetchInterval = do
_eeCtxEventThreadsCapacity <- newTVar maxT
_eeCtxLockedEvents <- newTVar Set.empty
return $ EventEngineCtx{..}
-- | Service events from our in-DB queue.
@ -161,9 +159,9 @@ initEventEngineCtx maxT _eeCtxFetchInterval = do
-- - limit webhook HTTP concurrency per HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE
:: (HasVersion) => L.Logger L.Hasura -> LogEnvHeaders -> HTTP.Manager-> Q.PGPool
-> IO SchemaCache -> EventEngineCtx
-> IO SchemaCache -> EventEngineCtx -> LockedEventsCtx
-> IO void
processEventQueue logger logenv httpMgr pool getSchemaCache eeCtx@EventEngineCtx{..} = do
processEventQueue logger logenv httpMgr pool getSchemaCache eeCtx@EventEngineCtx{..} LockedEventsCtx {leEvents}= do
events0 <- popEventsBatch
go events0 0 False
@ -175,26 +173,9 @@ processEventQueue logger logenv httpMgr pool getSchemaCache eeCtx@EventEngineCtx
L.unLogger logger $ EventInternalErr err
return []
Right events -> do
saveLockedEvents events
saveLockedEvents (map eId events) leEvents
return events
-- After the events are fetched from the DB, we store the locked events
-- in a hash set(order doesn't matter and look ups are faster) in the
-- event engine context
saveLockedEvents :: [Event] -> IO ()
saveLockedEvents evts =
liftIO $ atomically $ do
lockedEvents <- readTVar _eeCtxLockedEvents
let evtsIds = map eId evts
let newLockedEvents = Set.union lockedEvents (Set.fromList evtsIds)
writeTVar _eeCtxLockedEvents $! newLockedEvents
removeEventFromLockedEvents :: EventId -> IO ()
removeEventFromLockedEvents eventId = do
liftIO $ atomically $ do
lockedEvents <- readTVar _eeCtxLockedEvents
writeTVar _eeCtxLockedEvents $! Set.delete eventId lockedEvents
-- work on this batch of events while prefetching the next. Recurse after we've forked workers
-- for each in the batch, minding the requested pool size.
go :: [Event] -> Int -> Bool -> IO void
@ -211,7 +192,7 @@ processEventQueue logger logenv httpMgr pool getSchemaCache eeCtx@EventEngineCtx
runReaderT (withEventEngineCtx eeCtx $ (processEvent event)) (logger, httpMgr)
-- removing an event from the _eeCtxLockedEvents after the event has
-- been processed
removeEventFromLockedEvents (eId event)
removeEventFromLockedEvents (eId event) leEvents
wait eventsNextA
let lenEvents = length events
@ -282,10 +263,10 @@ withEventEngineCtx ::
withEventEngineCtx eeCtx = bracket_ (decrementThreadCount eeCtx) (incrementThreadCount eeCtx)
incrementThreadCount :: MonadIO m => EventEngineCtx -> m ()
incrementThreadCount (EventEngineCtx c _ _) = liftIO $ atomically $ modifyTVar' c (+1)
incrementThreadCount (EventEngineCtx c _) = liftIO $ atomically $ modifyTVar' c (+1)
decrementThreadCount :: MonadIO m => EventEngineCtx -> m ()
decrementThreadCount (EventEngineCtx c _ _) = liftIO $ atomically $ do
decrementThreadCount (EventEngineCtx c _) = liftIO $ atomically $ do
countThreads <- readTVar c
if countThreads > 0
then modifyTVar' c (\v -> v - 1)
@ -66,10 +66,16 @@ module Hasura.Eventing.ScheduledTrigger
, generateScheduleTimes
, insertCronEvents
, StandAloneScheduledEvent(..)
, initLockedEventsCtx
, LockedEventsCtx(..)
, unlockCronEvents
, unlockStandaloneScheduledEvents
, unlockAllLockedScheduledEvents
) where
import Control.Arrow.Extended (dup)
import Control.Concurrent.Extended (sleep)
import Control.Concurrent.STM.TVar
import Data.Has
import Data.Int (Int64)
import Data.List (unfoldr)
@ -83,6 +89,7 @@ import Hasura.Server.Version (HasVersion)
import Hasura.RQL.DDL.EventTrigger (getHeaderInfosFromConf)
import Hasura.SQL.DML
import Hasura.SQL.Types
import Hasura.Eventing.Common
import System.Cron
import qualified Data.Aeson as J
@ -96,6 +103,10 @@ import qualified Hasura.Logging as L
import qualified Network.HTTP.Client as HTTP
import qualified Text.Builder as TB (run)
import qualified PostgreSQL.Binary.Decoding as PD
import qualified Data.Set as Set
import qualified Database.PG.Query.PTI as PTI
import qualified PostgreSQL.Binary.Encoding as PE
newtype ScheduledTriggerInternalErr
= ScheduledTriggerInternalErr QErr
@ -141,6 +152,8 @@ instance Q.FromCol ScheduledEventStatus where
instance J.ToJSON ScheduledEventStatus where
toJSON = J.String . scheduledEventStatusToText
type ScheduledEventId = Text
data CronTriggerStats
= CronTriggerStats
{ ctsName :: !TriggerName
@ -156,7 +169,7 @@ data CronEventSeed
data CronEventPartial
= CronEventPartial
{ cepId :: !Text
{ cepId :: !CronEventId
, cepName :: !TriggerName
, cepScheduledTime :: !UTCTime
, cepTries :: !Int
@ -164,7 +177,7 @@ data CronEventPartial
data ScheduledEventFull
= ScheduledEventFull
{ sefId :: !Text
{ sefId :: !ScheduledEventId
, sefName :: !(Maybe TriggerName)
-- ^ sefName is the name of the cron trigger.
-- A standalone scheduled event is not associated with a name, so in that
@ -182,7 +195,7 @@ $(J.deriveToJSON (J.aesonDrop 3 J.snakeCase) {J.omitNothingFields = True} ''Sche
data StandAloneScheduledEvent
= StandAloneScheduledEvent
{ saseId :: !Text
{ saseId :: !StandAloneScheduledEventId
, saseScheduledTime :: !UTCTime
, saseTries :: !Int
, saseWebhook :: !InputWebhook
@ -318,14 +331,19 @@ processCronEvents
-> HTTP.Manager
-> Q.PGPool
-> IO SchemaCache
-> TVar (Set.Set CronEventId)
-> IO ()
processCronEvents logger logEnv httpMgr pgpool getSC = do
processCronEvents logger logEnv httpMgr pgpool getSC lockedCronEvents = do
cronTriggersInfo <- scCronTriggers <$> getSC
cronScheduledEvents <-
runExceptT $
Q.runTx pgpool (Q.ReadCommitted, Just Q.ReadWrite) getPartialCronEvents
case cronScheduledEvents of
Right partialEvents ->
Right partialEvents -> do
-- save the locked standalone events that have been fetched from the
-- database, the events stored here will be unlocked in case a
-- graceful shutdown is initiated in midst of processing these events
saveLockedEvents (map cepId partialEvents) lockedCronEvents
for_ partialEvents $ \(CronEventPartial id' name st tries)-> do
case Map.lookup name cronTriggersInfo of
Nothing -> logInternalError $
@ -345,6 +363,7 @@ processCronEvents logger logEnv httpMgr pgpool getSC = do
finally <- runExceptT $
runReaderT (processScheduledEvent logEnv pgpool scheduledEvent CronScheduledEvent) (logger, httpMgr)
removeEventFromLockedEvents id' lockedCronEvents
either logInternalError pure finally
Left err -> logInternalError err
@ -356,13 +375,18 @@ processStandAloneEvents
-> LogEnvHeaders
-> HTTP.Manager
-> Q.PGPool
-> TVar (Set.Set StandAloneScheduledEventId)
-> IO ()
processStandAloneEvents logger logEnv httpMgr pgpool = do
processStandAloneEvents logger logEnv httpMgr pgpool lockedStandAloneEvents = do
standAloneScheduledEvents <-
runExceptT $
Q.runTx pgpool (Q.ReadCommitted, Just Q.ReadWrite) getOneOffScheduledEvents
case standAloneScheduledEvents of
Right standAloneScheduledEvents' ->
Right standAloneScheduledEvents' -> do
-- save the locked standalone events that have been fetched from the
-- database, the events stored here will be unlocked in case a
-- graceful shutdown is initiated in midst of processing these events
saveLockedEvents (map saseId standAloneScheduledEvents') lockedStandAloneEvents
for_ standAloneScheduledEvents' $
\(StandAloneScheduledEvent id'
@ -394,6 +418,7 @@ processStandAloneEvents logger logEnv httpMgr pgpool = do
finally <- runExceptT $
runReaderT (processScheduledEvent logEnv pgpool scheduledEvent StandAloneEvent) $
(logger, httpMgr)
removeEventFromLockedEvents id' lockedStandAloneEvents
either logInternalError pure finally
Left headerInfoErr -> logInternalError headerInfoErr
@ -411,11 +436,12 @@ processScheduledTriggers
-> HTTP.Manager
-> Q.PGPool
-> IO SchemaCache
-> LockedEventsCtx
-> IO void
processScheduledTriggers logger logEnv httpMgr pgpool getSC=
processScheduledTriggers logger logEnv httpMgr pgpool getSC LockedEventsCtx {..} =
forever $ do
processCronEvents logger logEnv httpMgr pgpool getSC
processStandAloneEvents logger logEnv httpMgr pgpool
processCronEvents logger logEnv httpMgr pgpool getSC leCronEvents
processStandAloneEvents logger logEnv httpMgr pgpool leStandAloneEvents
sleep (minutes 1)
processScheduledEvent ::
@ -684,3 +710,50 @@ getOneOffScheduledEvents = do
liftExceptTIO :: (MonadError e m, MonadIO m) => ExceptT e IO a -> m a
liftExceptTIO m = liftEither =<< liftIO (runExceptT m)
newtype ScheduledEventIdArray =
ScheduledEventIdArray { unScheduledEventIdArray :: [ScheduledEventId]}
deriving (Show, Eq)
instance Q.ToPrepArg ScheduledEventIdArray where
toPrepVal (ScheduledEventIdArray l) = Q.toPrepValHelper PTI.unknown encoder $ l
-- 25 is the OID value of TEXT, https://jdbc.postgresql.org/development/privateapi/constant-values.html
encoder = PE.array 25 . PE.dimensionArray foldl' (PE.encodingArray . PE.text_strict)
unlockCronEvents :: [ScheduledEventId] -> Q.TxE QErr Int
unlockCronEvents scheduledEventIds =
(runIdentity . Q.getRow) <$> Q.withQE defaultTxErrorHandler
WITH "cte" AS
(UPDATE hdb_catalog.hdb_cron_events
SET status = 'scheduled'
WHERE id = ANY($1::text[]) and status = 'locked'
SELECT count(*) FROM "cte"
|] (Identity $ ScheduledEventIdArray scheduledEventIds) True
unlockStandaloneScheduledEvents :: [ScheduledEventId] -> Q.TxE QErr Int
unlockStandaloneScheduledEvents scheduledEventIds =
(runIdentity . Q.getRow) <$> Q.withQE defaultTxErrorHandler
WITH "cte" AS
(UPDATE hdb_catalog.hdb_scheduled_events
SET status = 'scheduled'
WHERE id = ANY($1::text[]) AND status = 'locked'
SELECT count(*) FROM "cte"
|] (Identity $ ScheduledEventIdArray scheduledEventIds) True
unlockAllLockedScheduledEvents :: Q.TxE QErr ()
unlockAllLockedScheduledEvents = do
Q.unitQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.hdb_cron_events
SET status = 'scheduled'
WHERE status = 'locked'
|] () True
Q.unitQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.hdb_scheduled_events
SET status = 'scheduled'
WHERE status = 'locked'
|] () True
@ -5,6 +5,8 @@ module Hasura.RQL.Types.ScheduledTrigger
, CreateCronTrigger(..)
, STRetryConf(..)
, CreateScheduledEvent(..)
, CronEventId
, StandAloneScheduledEventId
, formatTime'
, defaultSTRetryConf
) where
@ -25,6 +27,10 @@ import qualified Data.Aeson as J
import qualified Data.Text as T
import qualified Hasura.RQL.Types.EventTrigger as ET
type CronEventId = Text
type StandAloneScheduledEventId = Text
data STRetryConf
= STRetryConf
{ strcNumRetries :: !Int
Reference in New Issue
Block a user