From 2c397f9f4f7628d6eb893974e273ecf3884d8a17 Mon Sep 17 00:00:00 2001 From: Anon Ray Date: Thu, 30 Jul 2020 08:04:50 +0530 Subject: [PATCH] server: stop eventing subsystem threads when shutting down (#5479) * server: stop eventing subsystem threads when shutting down * Apply suggestions from code review Co-authored-by: Karthikeyan Chinnakonda Co-authored-by: Phil Freeman Co-authored-by: Phil Freeman Co-authored-by: Karthikeyan Chinnakonda --- server/src-lib/Hasura/App.hs | 68 ++++++++++++++++++++---------------- 1 file changed, 38 insertions(+), 30 deletions(-) diff --git a/server/src-lib/Hasura/App.hs b/server/src-lib/Hasura/App.hs index b50767a070b..18f537801ed 100644 --- a/server/src-lib/Hasura/App.hs +++ b/server/src-lib/Hasura/App.hs @@ -2,11 +2,12 @@ module Hasura.App where -import Control.Concurrent.STM.TVar (readTVarIO, TVar) +import Control.Concurrent.STM.TVar (TVar, readTVarIO) import Control.Exception (throwIO) import Control.Lens (view, _2) import Control.Monad.Base -import Control.Monad.Catch (MonadCatch, MonadMask, MonadThrow, onException, Exception) +import Control.Monad.Catch (Exception, MonadCatch, MonadMask, + MonadThrow, onException) import Control.Monad.Morph (hoist) import Control.Monad.Stateless import Control.Monad.STM (atomically) @@ -21,12 +22,13 @@ import System.Mem (performMajorGC) import qualified Control.Concurrent.Async.Lifted.Safe as LA import qualified Control.Concurrent.Extended as C +import qualified Control.Immortal as Immortal import qualified Data.Aeson as A import qualified Data.ByteString.Char8 as BC import qualified Data.ByteString.Lazy.Char8 as BLC +import qualified Data.Environment as Env import qualified Data.Set as Set import qualified Data.Text as T -import qualified Data.Environment as Env import qualified Data.Time.Clock as Clock import qualified Data.Yaml as Y import qualified Database.PG.Query as Q @@ -35,18 +37,17 @@ import qualified Network.HTTP.Client.TLS as HTTP import qualified Network.Wai.Handler.Warp as Warp import qualified System.Log.FastLogger as FL import qualified Text.Mustache.Compile as M -import qualified Control.Immortal as Immortal -import Hasura.GraphQL.Transport.HTTP (MonadExecuteQuery(..)) import Hasura.Db import Hasura.EncJSON -import Hasura.Eventing.EventTrigger import Hasura.Eventing.Common +import Hasura.Eventing.EventTrigger import Hasura.Eventing.ScheduledTrigger import Hasura.GraphQL.Execute (MonadGQLExecutionCheck (..), checkQueryInAllowlist) import Hasura.GraphQL.Logging (MonadQueryLog (..), QueryLog (..)) import Hasura.GraphQL.Resolve.Action (asyncActionsProcessor) +import Hasura.GraphQL.Transport.HTTP (MonadExecuteQuery (..)) import Hasura.GraphQL.Transport.HTTP.Protocol (toParsed) import Hasura.Logging import Hasura.Prelude @@ -71,9 +72,9 @@ import Hasura.Server.Telemetry import Hasura.Server.Version import Hasura.Session -import qualified Hasura.Tracing as Tracing -import qualified Hasura.GraphQL.Transport.WebSocket.Server as WS import qualified Hasura.GraphQL.Execute.LiveQuery.Poll as EL +import qualified Hasura.GraphQL.Transport.WebSocket.Server as WS +import qualified Hasura.Tracing as Tracing data ExitCode = InvalidEnvironmentVariableOptionsError @@ -365,14 +366,14 @@ runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp pos fetchI = milliseconds $ fromMaybe (Milliseconds defaultFetchInterval) soEventsFetchInterval logEnvHeaders = soLogHeadersFromEnv - lockedEventsCtx <- liftIO $ atomically $ initLockedEventsCtx + lockedEventsCtx <- liftIO $ atomically initLockedEventsCtx -- prepare event triggers data prepareEvents _icPgPool logger eventEngineCtx <- liftIO $ atomically $ initEventEngineCtx maxEvThrds fetchI unLogger logger $ mkGenericStrLog LevelInfo "event_triggers" "starting workers" - _eventQueueThread <- C.forkImmortal "processEventQueue" logger $ + eventQueueThread <- C.forkImmortal "processEventQueue" logger $ processEventQueue logger logEnvHeaders _icHttpManager _icPgPool (getSCFromRef cacheRef) eventEngineCtx lockedEventsCtx @@ -381,35 +382,42 @@ runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp pos asyncActionsProcessor env logger (_scrCache cacheRef) _icPgPool _icHttpManager -- start a background thread to create new cron events - void $ liftIO $ C.forkImmortal "runCronEventsGenerator" logger $ + cronEventsThread <- liftIO $ C.forkImmortal "runCronEventsGenerator" logger $ runCronEventsGenerator logger _icPgPool (getSCFromRef cacheRef) -- prepare scheduled triggers prepareScheduledEvents _icPgPool logger -- start a background thread to deliver the scheduled events - void $ C.forkImmortal "processScheduledTriggers" logger $ + scheduledEventsThread <- C.forkImmortal "processScheduledTriggers" logger $ processScheduledTriggers env logger logEnvHeaders _icHttpManager _icPgPool (getSCFromRef cacheRef) lockedEventsCtx -- start a background thread to check for updates updateThread <- C.forkImmortal "checkForUpdates" logger $ liftIO $ checkForUpdates loggerCtx _icHttpManager - -- startTelemetry logger serveOpts cacheRef initCtx -- start a background thread for telemetry - when soEnableTelemetry $ do - unLogger logger $ mkGenericStrLog LevelInfo "telemetry" telemetryNotice + telemetryThread <- if soEnableTelemetry + then do + unLogger logger $ mkGenericStrLog LevelInfo "telemetry" telemetryNotice - (dbId, pgVersion) <- liftIO $ runTxIO _icPgPool (Q.ReadCommitted, Nothing) $ - (,) <$> getDbId <*> getPgVersion + (dbId, pgVersion) <- liftIO $ runTxIO _icPgPool (Q.ReadCommitted, Nothing) $ + (,) <$> getDbId <*> getPgVersion - void $ C.forkImmortal "runTelemetry" logger $ liftIO $ - runTelemetry logger _icHttpManager (getSCFromRef cacheRef) dbId _icInstanceId pgVersion + telemetryThread <- C.forkImmortal "runTelemetry" logger $ liftIO $ + runTelemetry logger _icHttpManager (getSCFromRef cacheRef) dbId _icInstanceId pgVersion + return $ Just telemetryThread + else return Nothing - - - -- events has its own shutdown mechanism, used in 'shutdownHandler' - let immortalThreads = [schemaSyncListenerThread, schemaSyncProcessorThread, updateThread, asyncActionsThread] + -- all the immortal threads are collected so that they can be stopped when gracefully shutting down + let immortalThreads = [ schemaSyncListenerThread + , schemaSyncProcessorThread + , updateThread + , asyncActionsThread + , eventQueueThread + , scheduledEventsThread + , cronEventsThread + ] <> maybe [] pure telemetryThread finishTime <- liftIO Clock.getCurrentTime let apiInitTime = realToFrac $ Clock.diffUTCTime finishTime initTime @@ -473,7 +481,7 @@ runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp pos -> IO () unlockEventsForShutdown pool (Logger logger) triggerType eventType doUnlock lockedIdsVar = do lockedIds <- readTVarIO lockedIdsVar - unless (Set.null lockedIds) do + unless (Set.null lockedIds) $ do result <- runTx pool (Q.ReadCommitted, Nothing) (doUnlock $ toList lockedIds) case result of Left err -> logger $ mkGenericStrLog LevelWarn triggerType $ @@ -514,7 +522,7 @@ runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp pos shutdownApp cleanLoggerCtx loggerCtx --- | The RTS's idle GC doesn't work for us: +-- | The RTS's idle GC doesn't work for us: -- -- - when `-I` is too low it may fire continuously causing scary high CPU -- when idle among other issues (see #2565) @@ -523,19 +531,19 @@ runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp pos -- see memory stay high after finishing). In the theoretical worst case -- there is such low haskell heap pressure that we never run finalizers to -- free the foreign data from e.g. libpq. --- - `-Iw` is not yet implemented in 8.10.1: https://gitlab.haskell.org/ghc/ghc/-/issues/18433 +-- - `-Iw` is not yet implemented in 8.10.1: https://gitlab.haskell.org/ghc/ghc/-/issues/18433 -- - even if it was these two knobs would still not give us a guarentee that -- a major GC would always run at some minumum frequency (e.g. for finalizers) -- -- ...so we hack together our own using GHC.Stats, which should have -- insignificant runtime overhead. -ourIdleGC - :: Logger Hasura +ourIdleGC + :: Logger Hasura -> DiffTime -- ^ Run a major GC when we've been "idle" for idleInterval -> DiffTime -- ^ ...as long as it has been > minGCInterval time since the last major GC -> DiffTime -- ^ Additionally, if it has been > maxNoGCInterval time, force a GC regardless. -> IO void -ourIdleGC (Logger logger) idleInterval minGCInterval maxNoGCInterval = +ourIdleGC (Logger logger) idleInterval minGCInterval maxNoGCInterval = startTimer >>= go 0 0 where go gcs_prev major_gcs_prev timerSinceLastMajorGC = do @@ -558,7 +566,7 @@ ourIdleGC (Logger logger) idleInterval minGCInterval maxNoGCInterval = -- we are idle and its a good time to do a GC, or we're overdue and must run a GC: | areIdle || areOverdue -> do when (areOverdue && not areIdle) $ - logger $ UnstructuredLog LevelWarn $ + logger $ UnstructuredLog LevelWarn $ "Overdue for a major GC: forcing one even though we don't appear to be idle" performMajorGC startTimer >>= go (gcs+1) (major_gcs+1)