server: simplify shutdown logic, improve resource management (#218) (#195)

* Remove unused ExitCode constructors

* Simplify shutdown logic

* Update server/src-lib/Hasura/App.hs

Co-authored-by: Brandon Simmons <brandon@hasura.io>

* WIP: fix zombie thread issue

* Use forkCodensity for the schema sync thread

* Use forkCodensity for the oauthTokenUpdateWorker

* Use forkCodensity for the schema update processor thread

* Add deprecation notice

* Logger threads use Codensity

* Add the MonadFix instance for Codensity to get log-sender thread logs

* Move outIdleGC out to the top level, WIP

* Update forkImmortal fuction for more logging info

* add back the idle GC to Pro

* setupAuth

* use ImmortalThreadLog

* Fix tests

* Add another finally block

* loud warnings

* Change log level

* hlint

* Finalize the logger in the correct place

* Add ManagedT

* Update server/src-lib/Hasura/Server/Auth.hs

Co-authored-by: Brandon Simmons <brandon@hasura.io>

* Comments etc.

Co-authored-by: Brandon Simmons <brandon@hasura.io>
Co-authored-by: Naveen Naidu <naveennaidu479@gmail.com>
GitOrigin-RevId: 156065c5c3ace0e13d1997daef6921cc2e9f641c
This commit is contained in:
Phil Freeman 2020-12-21 10:56:00 -08:00 committed by hasura-bot
parent 792de5ba7c
commit 2dfbf99b41
11 changed files with 358 additions and 232 deletions

View File

@ -83,6 +83,8 @@ and be accessible according to the permissions that were configured for the role
- server: support joining Int or String scalar types to ID scalar type in remote relationship
- server: add support for POSIX operators (close #4317) (#6172)
- server: do not block catalog migration on inconsistent metadata
- server: update `forkImmortal` function to log more information, i.e log starting of threads and log asynchronous and synchronous exception.
- server: various changes to ensure timely cleanup of background threads and other resources in the event of a SIGTERM signal.
- console: allow user to cascade Postgres dependencies when dropping Postgres objects (close #5109) (#5248)
- console: mark inconsistent remote schemas in the UI (close #5093) (#5181)
- console: remove ONLY as default for ALTER TABLE in column alter operations (close #5512) #5706

View File

@ -120,6 +120,7 @@ library
, unordered-containers >= 0.2.12
, template-haskell
, hashable
, kan-extensions
, transformers
, transformers-base
, http-types
@ -286,6 +287,7 @@ library
, Control.Concurrent.Extended
, Control.Lens.Extended
, Control.Monad.Stateless
, Control.Monad.Trans.Managed
, Control.Monad.Unique
, Data.Aeson.Extended
, Data.Aeson.Ordered
@ -356,6 +358,7 @@ library
-- Exposed for testing:
, Hasura.Server.Telemetry.Counters
, Hasura.Server.Auth.JWT
, Hasura.GC
, Hasura.GraphQL.Execute
, Hasura.GraphQL.Execute.LiveQuery
, Hasura.GraphQL.Transport.HTTP
@ -505,6 +508,7 @@ executable graphql-engine
build-depends: base
, graphql-engine
, bytestring
, kan-extensions
, pg-client
, text
, text-conversions
@ -528,6 +532,7 @@ test-suite graphql-engine-tests
, http-client-tls
, jose
, lifted-base
, kan-extensions
, monad-control
, mtl
, natural-transformation >=0.4 && <0.5

View File

@ -3,30 +3,33 @@
module Main where
import Control.Exception
import Data.Int (Int64)
import Data.Text.Conversions (convertText)
import Data.Time.Clock (getCurrentTime)
import Data.Time.Clock.POSIX (getPOSIXTime)
import Control.Monad.Trans.Managed (ManagedT(..), lowerManagedT)
import Data.Int (Int64)
import Data.Text.Conversions (convertText)
import Data.Time.Clock (getCurrentTime)
import Data.Time.Clock.POSIX (getPOSIXTime)
import Hasura.App
import Hasura.Logging (Hasura, LogLevel (..), defaultEnabledEngineLogTypes)
import Hasura.Logging (Hasura, LogLevel (..), defaultEnabledEngineLogTypes)
import Hasura.Metadata.Class
import Hasura.Prelude
import Hasura.RQL.DDL.Schema
import Hasura.RQL.Types
import Hasura.Server.Init
import Hasura.Server.Migrate (downgradeCatalog, dropCatalog)
import Hasura.Server.Migrate (downgradeCatalog, dropCatalog)
import Hasura.Server.Version
import qualified Data.ByteString.Char8 as BC
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Lazy.Char8 as BLC
import qualified Data.Environment as Env
import qualified Database.PG.Query as Q
import qualified Hasura.Tracing as Tracing
import qualified System.Exit as Sys
import qualified System.Metrics as EKG
import qualified System.Posix.Signals as Signals
import qualified Control.Concurrent.Extended as C
import qualified Data.ByteString.Char8 as BC
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Lazy.Char8 as BLC
import qualified Data.Environment as Env
import qualified Database.PG.Query as Q
import qualified Hasura.GC as GC
import qualified Hasura.Tracing as Tracing
import qualified System.Exit as Sys
import qualified System.Metrics as EKG
import qualified System.Posix.Signals as Signals
main :: IO ()
@ -47,8 +50,6 @@ runApp env (HGEOptionsG rci hgeCmd) = do
withVersion $$(getVersionFromEnvironment) $ case hgeCmd of
HCServe serveOptions -> do
serveCtx <- initialiseServeCtx env globalCtx serveOptions
ekgStore <- liftIO do
s <- EKG.newStore
EKG.registerGcMetrics s
@ -59,19 +60,27 @@ runApp env (HGEOptionsG rci hgeCmd) = do
EKG.registerCounter "ekg.server_timestamp_ms" getTimeMs s
pure s
let shutdownApp = return ()
-- Catches the SIGTERM signal and initiates a graceful shutdown.
-- Graceful shutdown for regular HTTP requests is already implemented in
-- Warp, and is triggered by invoking the 'closeSocket' callback.
-- We only catch the SIGTERM signal once, that is, if the user hits CTRL-C
-- once again, we terminate the process immediately.
_ <- liftIO $ Signals.installHandler
Signals.sigTERM
(Signals.CatchOnce (shutdownGracefully $ _scShutdownLatch serveCtx))
Nothing
serverMetrics <- liftIO $ createServerMetrics ekgStore
flip runPGMetadataStorageApp (_scPgPool serveCtx) $
runHGEServer env serveOptions serveCtx Nothing initTime shutdownApp Nothing serverMetrics ekgStore
-- It'd be nice if we didn't have to call runManagedT twice here, but
-- there is a data dependency problem since the call to runPGMetadataStorageApp
-- below depends on serveCtx.
runManagedT (initialiseServeCtx env globalCtx serveOptions) $ \serveCtx -> do
-- Catches the SIGTERM signal and initiates a graceful shutdown.
-- Graceful shutdown for regular HTTP requests is already implemented in
-- Warp, and is triggered by invoking the 'closeSocket' callback.
-- We only catch the SIGTERM signal once, that is, if the user hits CTRL-C
-- once again, we terminate the process immediately.
_ <- liftIO $ Signals.installHandler
Signals.sigTERM
(Signals.CatchOnce (shutdownGracefully $ _scShutdownLatch serveCtx))
Nothing
let Loggers _ logger _ = _scLoggers serveCtx
_idleGCThread <- C.forkImmortal "ourIdleGC" logger $
GC.ourIdleGC logger (seconds 0.3) (seconds 10) (seconds 60)
serverMetrics <- liftIO $ createServerMetrics ekgStore
flip runPGMetadataStorageApp (_scPgPool serveCtx) . lowerManagedT $ do
runHGEServer env serveOptions serveCtx Nothing initTime Nothing serverMetrics ekgStore
HCExport -> do
res <- runTxWithMinimalPool _gcConnInfo fetchMetadataFromCatalog
@ -85,18 +94,18 @@ runApp env (HGEOptionsG rci hgeCmd) = do
HCExecute -> do
queryBs <- liftIO BL.getContents
let sqlGenCtx = SQLGenCtx False
pool <- mkMinimalPool _gcConnInfo
res <- flip runPGMetadataStorageApp pool $
runMetadataStorageT $ liftEitherM $
runAsAdmin pool sqlGenCtx RemoteSchemaPermsDisabled _gcHttpManager $ do
metadata <- liftTx fetchMetadataFromCatalog
schemaCache <- buildRebuildableSchemaCache env metadata
execQuery env queryBs
& Tracing.runTraceTWithReporter Tracing.noReporter "execute"
& runMetadataT metadata
& runCacheRWT schemaCache
& fmap (\((res, _), _, _) -> res)
either (printErrJExit ExecuteProcessError) (liftIO . BLC.putStrLn) res
runManagedT (mkMinimalPool _gcConnInfo) $ \pool -> do
res <- flip runPGMetadataStorageApp pool $
runMetadataStorageT $ liftEitherM $
runAsAdmin pool sqlGenCtx RemoteSchemaPermsDisabled _gcHttpManager $ do
metadata <- liftTx fetchMetadataFromCatalog
schemaCache <- buildRebuildableSchemaCache env metadata
execQuery env queryBs
& Tracing.runTraceTWithReporter Tracing.noReporter "execute"
& runMetadataT metadata
& runCacheRWT schemaCache
& fmap (\((res, _), _, _) -> res)
either (printErrJExit ExecuteProcessError) (liftIO . BLC.putStrLn) res
HCDowngrade opts -> do
res <- runTxWithMinimalPool _gcConnInfo $ downgradeCatalog opts initTime
@ -104,7 +113,7 @@ runApp env (HGEOptionsG rci hgeCmd) = do
HCVersion -> liftIO $ putStrLn $ "Hasura GraphQL Engine: " ++ convertText currentVersion
where
runTxWithMinimalPool connInfo tx = do
runTxWithMinimalPool connInfo tx = lowerManagedT $ do
minimalPool <- mkMinimalPool connInfo
liftIO $ runExceptT $ Q.runTx minimalPool (Q.ReadCommitted, Nothing) tx

View File

@ -4,6 +4,7 @@ module Control.Concurrent.Extended
, ForkableMonadIO
-- * Robust forking
, forkImmortal
, forkManagedT
-- * Deprecated
, threadDelay
, forkIO
@ -11,6 +12,7 @@ module Control.Concurrent.Extended
import Prelude
import Control.Exception
import Control.Monad.Trans.Managed (ManagedT(..), allocate)
import Control.Monad.IO.Class
import Control.Monad
import Data.Aeson
@ -21,8 +23,9 @@ import qualified Control.Concurrent.Async.Lifted.Safe as LA
import qualified Control.Immortal as Immortal
import qualified Control.Monad.Trans.Control as MC
import Control.Concurrent hiding (threadDelay, forkIO)
import Data.Time.Clock.Units (seconds, Microseconds (..), DiffTime)
import Control.Concurrent hiding (threadDelay, forkIO)
import Data.Time.Clock.Units (seconds, Microseconds (..), DiffTime)
-- For forkImmortal. We could also have it take a cumbersome continuation if we
-- want to break this dependency. Probably best to move Hasura.Logging into a
@ -46,6 +49,8 @@ threadDelay = Base.threadDelay
forkIO :: IO () -> IO ThreadId
forkIO = Base.forkIO
-- | Note: Please consider using 'forkManagedT' instead to ensure reliable
-- resource cleanup.
forkImmortal
:: ForkableMonadIO m
=> String
@ -57,25 +62,56 @@ forkImmortal
-> m Immortal.Thread
-- ^ A handle for the forked thread. See "Control.Immortal".
forkImmortal label logger m =
Immortal.createWithLabel label $ \this ->
Immortal.createWithLabel label $ \this -> do
-- Log that the thread has started
liftIO $ unLogger logger (ImmortalThreadRestarted label )
-- In this case, we are handling unexpected exceptions.
-- i.e This does not catch the asynchronous exception which stops the thread.
Immortal.onUnexpectedFinish this logAndPause (void m)
where logAndPause = \case
Right _void -> pure () -- absurd _void (i.e. unreachable)
Left e -> liftIO $ do
liftIO $ unLogger logger $
ImmortalThreadLog label e
-- pause before restarting some arbitrary amount of time. The idea is not to flood
-- logs or cause other cascading failures.
sleep (seconds 1)
where logAndPause = \case
Right _void -> pure () -- absurd _void (i.e. unreachable)
Left e -> liftIO $ do
liftIO $ unLogger logger (ImmortalThreadUnexpectedException label e)
-- pause before restarting some arbitrary amount of time. The idea is not to flood
-- logs or cause other cascading failures.
sleep (seconds 1)
data ImmortalThreadLog = ImmortalThreadLog String SomeException
-- | This function pairs a call to 'forkImmortal' with a finalizer which stops
-- the immortal thread.
--
-- Note, the thread object can leave its scope if this function is incorrectly
-- used. Generally, the result should only be used later in the same ManagedT
-- scope.
forkManagedT
:: ForkableMonadIO m
=> String
-> Logger Hasura
-> m Void
-> ManagedT m Immortal.Thread
forkManagedT label logger m = allocate
(forkImmortal label logger m)
(\thread -> do
unLogger logger (ImmortalThreadStopping label)
liftIO $ Immortal.stop thread)
data ImmortalThreadLog
= ImmortalThreadUnexpectedException String SomeException
-- ^ Synchronous Exception
| ImmortalThreadStopping String
-- ^ Asynchronous Exception about to be sent
| ImmortalThreadRestarted String
instance ToEngineLog ImmortalThreadLog Hasura where
toEngineLog (ImmortalThreadLog label e) =
toEngineLog (ImmortalThreadStopping label) =
(LevelInfo, ELTInternal ILTUnstructured, toJSON msg)
where msg = "Stopping immortal " <> label <> " thread"
toEngineLog (ImmortalThreadUnexpectedException label e) =
(LevelError, ELTInternal ILTUnstructured, toJSON msg)
where msg = "Unexpected exception in immortal thread \""<>label<>"\" (it will be restarted):\n"
where msg = "Unexpected exception in immortal thread " <> label <> " (it will be restarted):\n"
<> show e
toEngineLog (ImmortalThreadRestarted label) =
(LevelInfo, ELTInternal ILTUnstructured, toJSON msg)
where msg = "Thread " <> label <> " (re)started"
-- TODO
-- - maybe use this everywhere, but also:

View File

@ -0,0 +1,74 @@
{-# LANGUAGE DerivingVia #-}
module Control.Monad.Trans.Managed where
import Prelude
import Control.Exception.Lifted (bracket, bracket_)
import Control.Monad.Codensity (Codensity(..))
import Control.Monad.Fix (MonadFix(..))
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Reader.Class (MonadReader)
import Control.Monad.State.Class (MonadState)
import Control.Monad.Trans (MonadTrans(..))
import Control.Monad.Trans.Control (MonadBaseControl)
import Control.Monad.Trans.Reader (ReaderT(..))
import GHC.IO.Unsafe (unsafeDupableInterleaveIO)
import qualified Control.Concurrent as C
-- | This type is like a transformer version of the @Managed@ monad from the
-- @managed@ library. It can be used to manage resources by pairing together
-- their allocation with their finalizers.
--
-- The documentation for the @managed@ library is an excellent introduction to
-- the idea here.
--
-- We could use 'Codensity' directly, but we'd have to define an orphan instance
-- for 'MonadFix'. This also gives us the opportunity to give it a slightly more
-- friendly name.
--
-- We could also have used @ResourceT@, but that would have involved writing
-- instances for @MonadUnliftIO@. That could still be a good option to consider
-- later, however.
newtype ManagedT m a = ManagedT { runManagedT :: forall r. (a -> m r) -> m r }
deriving ( Functor
, Applicative
, Monad
, MonadIO
, MonadReader r
, MonadState s
) via (Codensity m)
deriving MonadTrans via Codensity
-- | Allocate a resource by providing setup and finalizer actions.
allocate :: MonadBaseControl IO m => m a -> (a -> m b) -> ManagedT m a
allocate setup finalize = ManagedT (bracket setup finalize)
-- | Allocate a resource but do not return a reference to it.
allocate_ :: MonadBaseControl IO m => m a -> m b -> ManagedT m ()
allocate_ setup finalize = ManagedT (\k -> bracket_ setup finalize (k ()))
-- | Run the provided computation by returning its result, and run any finalizers.
-- Watch out: this function might leak finalized resources.
lowerManagedT :: Monad m => ManagedT m a -> m a
lowerManagedT m = runManagedT m return
hoistManagedTReaderT :: Monad m => r -> ManagedT (ReaderT r m) a -> ManagedT m a
hoistManagedTReaderT r cod = ManagedT $ \k ->
runReaderT (runManagedT cod (lift . k)) r
-- | We need this instance to tie the knot when initializing resources.
-- It'd be nice if we could do this with a 'MonadFix' constraint on the underlying
-- monad, but here we just use 'MonadIO' to tie the knot using a lazily-evaluated
-- 'MVar'-based promise for the eventual result.
--
-- We need to be careful not to leak allocated resources via the use of
-- recursively-defined monadic actions when making use of this instance.
instance MonadIO m => MonadFix (ManagedT m) where
mfix f = ManagedT \k -> do
m <- liftIO C.newEmptyMVar
ans <- liftIO $ unsafeDupableInterleaveIO (C.readMVar m)
runManagedT (f ans) \a -> do
liftIO $ C.putMVar m a
k a

View File

@ -8,6 +8,7 @@ import Control.Exception (bracket_, throwIO)
import Control.Monad.Base
import Control.Monad.Catch (Exception, MonadCatch, MonadMask,
MonadThrow, onException)
import Control.Monad.Trans.Managed (ManagedT(..), allocate)
import Control.Monad.Morph (hoist)
import Control.Monad.Stateless
import Control.Monad.STM (atomically)
@ -17,15 +18,12 @@ import Data.Time.Clock (UTCTime)
#ifndef PROFILING
import GHC.AssertNF
#endif
import GHC.Stats
import Options.Applicative
import System.Environment (getEnvironment)
import System.Mem (performMajorGC)
import qualified Control.Concurrent.Async.Lifted.Safe as LA
import qualified Control.Concurrent.Extended as C
import qualified Control.Immortal as Immortal
import qualified Data.Aeson as J
import qualified Control.Exception.Lifted as LE
import qualified Data.Aeson as A
import qualified Data.ByteString.Char8 as BC
import qualified Data.ByteString.Lazy.Char8 as BLC
@ -82,19 +80,17 @@ import qualified System.Metrics.Gauge as EKG.Gauge
data ExitCode
-- these are used during server initialization:
= InvalidEnvironmentVariableOptionsError
| InvalidDatabaseConnectionParamsError
| MetadataCatalogFetchingError
| AuthConfigurationError
| EventSubSystemError
| EventEnvironmentVariableError
| DatabaseMigrationError
-- these are used in app/Main.hs:
| MetadataExportError
| MetadataCleanError
| DatabaseMigrationError
| ExecuteProcessError
| DowngradeProcessError
| UnexpectedHasuraError
| ExitFailureError Int
deriving Show
data ExitException
@ -203,11 +199,11 @@ newtype PGMetadataStorageApp a
-- | Initializes or migrates the catalog and returns the context required to start the server.
initialiseServeCtx
:: (HasVersion, MonadIO m, MonadBaseControl IO m, MonadCatch m)
:: (HasVersion, C.ForkableMonadIO m, MonadCatch m)
=> Env.Environment
-> GlobalCtx
-> ServeOptions Hasura
-> m ServeCtx
-> ManagedT m ServeCtx
initialiseServeCtx env GlobalCtx{..} so@ServeOptions{..} = do
instanceId <- liftIO generateInstanceId
latch <- liftIO newShutdownLatch
@ -225,7 +221,7 @@ initialiseServeCtx env GlobalCtx{..} so@ServeOptions{..} = do
(schemaSyncListenerThread, schemaSyncEventRef) <- startSchemaSyncListenerThread pool logger instanceId
(rebuildableSchemaCache, cacheInitStartTime) <-
flip onException (flushLogger loggerCtx) $ migrateCatalogSchema env logger pool _gcHttpManager sqlGenCtx soEnableRemoteSchemaPermissions
lift . flip onException (flushLogger loggerCtx) $ migrateCatalogSchema env logger pool _gcHttpManager sqlGenCtx soEnableRemoteSchemaPermissions
let schemaSyncCtx = SchemaSyncCtx schemaSyncListenerThread schemaSyncEventRef cacheInitStartTime
initCtx = ServeCtx _gcHttpManager instanceId loggers _gcConnInfo pool latch
@ -233,10 +229,12 @@ initialiseServeCtx env GlobalCtx{..} so@ServeOptions{..} = do
pure initCtx
mkLoggers
:: (MonadIO m)
=> HashSet (EngineLogType Hasura) -> LogLevel -> m Loggers
:: (MonadIO m, MonadBaseControl IO m)
=> HashSet (EngineLogType Hasura)
-> LogLevel
-> ManagedT m Loggers
mkLoggers enabledLogs logLevel = do
loggerCtx <- liftIO $ mkLoggerCtx (defaultLoggerSettings True logLevel) enabledLogs
loggerCtx <- mkLoggerCtx (defaultLoggerSettings True logLevel) enabledLogs
let logger = mkLogger loggerCtx
pgLogger = mkPGLogger logger
return $ Loggers loggerCtx logger pgLogger
@ -305,9 +303,30 @@ createServerMetrics store = do
smWarpThreads <- EKG.createGauge "warp_threads" store
pure ServerMetrics { .. }
-- | This function acts as the entrypoint for the graphql-engine webserver.
--
-- Note: at the exit of this function, or in case of a graceful server shutdown
-- (SIGTERM, or more generally, whenever the shutdown latch is set), we need to
-- make absolutely sure that we clean up any resources which were allocated during
-- server setup. In the case of a multitenant process, failure to do so can lead to
-- resource leaks.
--
-- To track these resources, we use the ManagedT monad, and attach finalizers at
-- the same point in the code where we allocate resources. If you fork a new
-- long-lived thread, or create a connection pool, or allocate any other
-- long-lived resource, make sure to pair the allocator with its finalizer.
-- There are plenty of examples throughout the code. For example, see
-- 'C.forkManagedT'.
--
-- Note also: the order in which the finalizers run can be important. Specifically,
-- we want the finalizers for the logger threads to run last, so that we retain as
-- many "thread stopping" log messages as possible. The order in which the
-- finalizers is run is determined by the order in which they are introduced in the
-- code.
{- HLINT ignore runHGEServer "Avoid lambda" -}
runHGEServer
:: ( HasVersion
:: forall m impl
. ( HasVersion
, MonadIO m
, MonadMask m
, MonadStateless IO m
@ -334,13 +353,11 @@ runHGEServer
-- and mutations
-> UTCTime
-- ^ start time
-> IO ()
-- ^ shutdown function
-> Maybe EL.LiveQueryPostPollHook
-> ServerMetrics
-> EKG.Store
-> m ()
runHGEServer env ServeOptions{..} ServeCtx{..} pgExecCtx initTime shutdownApp postPollHook serverMetrics ekgStore = do
-> ManagedT m ()
runHGEServer env ServeOptions{..} ServeCtx{..} pgExecCtx initTime postPollHook serverMetrics ekgStore = do
-- Comment this to enable expensive assertions from "GHC.AssertNF". These
-- will log lines to STDOUT containing "not in normal form". In the future we
-- could try to integrate this into our tests. For now this is a development
@ -360,10 +377,7 @@ runHGEServer env ServeOptions{..} ServeCtx{..} pgExecCtx initTime shutdownApp po
authMode <- onLeft authModeRes (printErrExit AuthConfigurationError . T.unpack)
_idleGCThread <- C.forkImmortal "ourIdleGC" logger $ liftIO $
ourIdleGC logger (seconds 0.3) (seconds 10) (seconds 60)
HasuraApp app cacheRef stopWsServer <- flip onException (flushLogger loggerCtx) $
HasuraApp app cacheRef stopWsServer <- lift $ flip onException (flushLogger loggerCtx) $
mkWaiApp env
soTxIso
logger
@ -395,77 +409,66 @@ runHGEServer env ServeOptions{..} ServeCtx{..} pgExecCtx initTime shutdownApp po
liftIO $ logInconsObjs logger inconsObjs
-- Start a background thread for processing schema sync event present in the '_sscSyncEventRef'
schemaSyncProcessorThread <- startSchemaSyncProcessorThread sqlGenCtx _scPgPool
logger _scHttpManager _sscSyncEventRef
cacheRef _scInstanceId _sscCacheInitStartTime soEnableRemoteSchemaPermissions
_ <- startSchemaSyncProcessorThread sqlGenCtx _scPgPool
logger _scHttpManager _sscSyncEventRef
cacheRef _scInstanceId _sscCacheInitStartTime soEnableRemoteSchemaPermissions
let
maxEvThrds = fromMaybe defaultMaxEventThreads soEventsHttpPoolSize
fetchI = milliseconds $ fromMaybe (Milliseconds defaultFetchInterval) soEventsFetchInterval
logEnvHeaders = soLogHeadersFromEnv
lockedEventsCtx <- liftIO $ atomically initLockedEventsCtx
lockedEventsCtx <- allocate
(liftIO $ atomically initLockedEventsCtx)
(\lockedEventsCtx ->
liftWithStateless \lowerIO ->
shutdownEvents _scPgPool (\a b -> hoist lowerIO (unlockScheduledEvents a b)) logger lockedEventsCtx)
-- prepare event triggers data
eventEngineCtx <- liftIO $ atomically $ initEventEngineCtx maxEvThrds fetchI
unLogger logger $ mkGenericStrLog LevelInfo "event_triggers" "starting workers"
eventQueueThread <- C.forkImmortal "processEventQueue" logger $
_eventQueueThread <- C.forkManagedT "processEventQueue" logger $
processEventQueue logger logEnvHeaders
_scHttpManager _scPgPool (getSCFromRef cacheRef) eventEngineCtx lockedEventsCtx
-- start a backgroud thread to handle async actions
asyncActionsThread <- C.forkImmortal "asyncActionsProcessor" logger $
_asyncActionsThread <- C.forkManagedT "asyncActionsProcessor" logger $
asyncActionsProcessor env logger (_scrCache cacheRef) _scHttpManager
-- start a background thread to create new cron events
cronEventsThread <- C.forkImmortal "runCronEventsGenerator" logger $
_cronEventsThread <- C.forkManagedT "runCronEventsGenerator" logger $
runCronEventsGenerator logger (getSCFromRef cacheRef)
-- prepare scheduled triggers
prepareScheduledEvents logger
lift $ prepareScheduledEvents logger
-- start a background thread to deliver the scheduled events
scheduledEventsThread <- C.forkImmortal "processScheduledTriggers" logger $
_scheduledEventsThread <- C.forkManagedT "processScheduledTriggers" logger $
processScheduledTriggers env logger logEnvHeaders _scHttpManager (getSCFromRef cacheRef) lockedEventsCtx
-- start a background thread to check for updates
updateThread <- C.forkImmortal "checkForUpdates" logger $ liftIO $
_updateThread <- C.forkManagedT "checkForUpdates" logger $ liftIO $
checkForUpdates loggerCtx _scHttpManager
-- start a background thread for telemetry
telemetryThread <- if soEnableTelemetry
_telemetryThread <- if soEnableTelemetry
then do
unLogger logger $ mkGenericStrLog LevelInfo "telemetry" telemetryNotice
lift . unLogger logger $ mkGenericStrLog LevelInfo "telemetry" telemetryNotice
(dbId, pgVersion) <- liftIO $ runTxIO _scPgPool (Q.ReadCommitted, Nothing) $
(,) <$> getDbId <*> getPgVersion
telemetryThread <- C.forkImmortal "runTelemetry" logger $ liftIO $
telemetryThread <- C.forkManagedT "runTelemetry" logger $ liftIO $
runTelemetry logger _scHttpManager (getSCFromRef cacheRef) dbId _scInstanceId pgVersion
return $ Just telemetryThread
else return Nothing
-- all the immortal threads are collected so that they can be stopped when gracefully shutting down
let immortalThreads = [ _sscListenerThreadId
, schemaSyncProcessorThread
, updateThread
, asyncActionsThread
, eventQueueThread
, scheduledEventsThread
, cronEventsThread
] <> onNothing telemetryThread []
finishTime <- liftIO Clock.getCurrentTime
let apiInitTime = realToFrac $ Clock.diffUTCTime finishTime initTime
unLogger logger $
mkGenericLog LevelInfo "server" $ StartupTimeInfo "starting API server" apiInitTime
shutdownHandler' <- liftWithStateless $ \lowerIO ->
pure $ shutdownHandler _scLoggers immortalThreads stopWsServer lockedEventsCtx _scPgPool $
\a b -> hoist lowerIO $ unlockScheduledEvents a b
-- Install a variant of forkIOWithUnmask which tracks Warp threads using an EKG metric
let setForkIOWithMetrics :: Warp.Settings -> Warp.Settings
setForkIOWithMetrics = Warp.setFork \f -> do
void $ C.forkIOWithUnmask (\unmask ->
@ -473,14 +476,28 @@ runHGEServer env ServeOptions{..} ServeCtx{..} pgExecCtx initTime shutdownApp po
(EKG.Gauge.inc $ smWarpThreads serverMetrics)
(EKG.Gauge.dec $ smWarpThreads serverMetrics)
(f unmask))
let shutdownHandler closeSocket = LA.link =<< LA.async do
waitForShutdown _scShutdownLatch
unLogger logger $ mkGenericStrLog LevelInfo "server" "gracefully shutting down server"
closeSocket
let warpSettings = Warp.setPort soPort
. Warp.setHost soHost
. Warp.setGracefulShutdownTimeout (Just 30) -- 30s graceful shutdown
. Warp.setInstallShutdownHandler shutdownHandler'
. Warp.setInstallShutdownHandler shutdownHandler
. setForkIOWithMetrics
$ Warp.defaultSettings
liftIO $ Warp.runSettings warpSettings app
-- Here we block until the shutdown latch 'MVar' is filled, and then
-- shut down the server. Once this blocking call returns, we'll tidy up
-- any resources using the finalizers attached using 'ManagedT' above.
-- Structuring things using the shutdown latch in this way lets us decide
-- elsewhere exactly how we want to control shutdown.
liftIO $ Warp.runSettings warpSettings app `LE.finally` do
-- These cleanup actions are not directly associated with any
-- resource, but we still need to make sure we clean them up here.
stopWsServer
where
-- | prepareScheduledEvents is a function to unlock all the scheduled trigger
@ -543,90 +560,6 @@ runHGEServer env ServeOptions{..} ServeCtx{..} pgExecCtx initTime shutdownApp po
runTx pool txLevel tx =
liftIO $ runExceptT $ Q.runTx pool txLevel tx
-- | Waits for the shutdown latch 'MVar' to be filled, and then
-- shuts down the server and associated resources.
-- Structuring things this way lets us decide elsewhere exactly how
-- we want to control shutdown.
shutdownHandler
:: Loggers
-> [Immortal.Thread]
-> IO ()
-- ^ the stop websocket server function
-> LockedEventsCtx
-> Q.PGPool
-> (ScheduledEventType -> [ScheduledEventId] -> MetadataStorageT IO Int)
-> IO ()
-- ^ the closeSocket callback
-> IO ()
shutdownHandler (Loggers loggerCtx (Logger logger) _) immortalThreads stopWsServer leCtx pool unlockScheduledEvents' closeSocket =
LA.link =<< LA.async do
waitForShutdown _scShutdownLatch
logger $ mkGenericStrLog LevelInfo "server" "gracefully shutting down server"
shutdownEvents pool unlockScheduledEvents' (Logger logger) leCtx
closeSocket
stopWsServer
-- kill all the background immortal threads
logger $ mkGenericStrLog LevelInfo "server" "killing all background immortal threads"
forM_ immortalThreads $ \thread -> do
logger $ mkGenericStrLog LevelInfo "server" $ "killing thread: " <> show (Immortal.threadId thread)
Immortal.stop thread
shutdownApp
cleanLoggerCtx loggerCtx
-- | The RTS's idle GC doesn't work for us:
--
-- - when `-I` is too low it may fire continuously causing scary high CPU
-- when idle among other issues (see #2565)
-- - when we set it higher it won't run at all leading to memory being
-- retained when idle (especially noticeable when users are benchmarking and
-- see memory stay high after finishing). In the theoretical worst case
-- there is such low haskell heap pressure that we never run finalizers to
-- free the foreign data from e.g. libpq.
-- - as of GHC 8.10.2 we have access to `-Iw`, but those two knobs still
-- dont give us a guarantee that a major GC will always run at some
-- minumum frequency (e.g. for finalizers)
--
-- ...so we hack together our own using GHC.Stats, which should have
-- insignificant runtime overhead.
ourIdleGC
:: Logger Hasura
-> DiffTime -- ^ Run a major GC when we've been "idle" for idleInterval
-> DiffTime -- ^ ...as long as it has been > minGCInterval time since the last major GC
-> DiffTime -- ^ Additionally, if it has been > maxNoGCInterval time, force a GC regardless.
-> IO void
ourIdleGC (Logger logger) idleInterval minGCInterval maxNoGCInterval =
startTimer >>= go 0 0
where
go gcs_prev major_gcs_prev timerSinceLastMajorGC = do
timeSinceLastGC <- timerSinceLastMajorGC
when (timeSinceLastGC < minGCInterval) $ do
-- no need to check idle until we've passed the minGCInterval:
C.sleep (minGCInterval - timeSinceLastGC)
RTSStats{gcs, major_gcs} <- getRTSStats
-- We use minor GCs as a proxy for "activity", which seems to work
-- well-enough (in tests it stays stable for a few seconds when we're
-- logically "idle" and otherwise increments quickly)
let areIdle = gcs == gcs_prev
areOverdue = timeSinceLastGC > maxNoGCInterval
-- a major GC was run since last iteration (cool!), reset timer:
if | major_gcs > major_gcs_prev -> do
startTimer >>= go gcs major_gcs
-- we are idle and its a good time to do a GC, or we're overdue and must run a GC:
| areIdle || areOverdue -> do
when (areOverdue && not areIdle) $
logger $ UnstructuredLog LevelWarn $
"Overdue for a major GC: forcing one even though we don't appear to be idle"
performMajorGC
startTimer >>= go (gcs+1) (major_gcs+1)
-- else keep the timer running, waiting for us to go idle:
| otherwise -> do
C.sleep idleInterval
go gcs major_gcs timerSinceLastMajorGC
runAsAdmin
:: (MonadIO m, MonadBaseControl IO m)
=> Q.PGPool
@ -769,12 +702,12 @@ mkConsoleHTML :: HasVersion => Text -> AuthMode -> Bool -> Maybe Text -> Either
mkConsoleHTML path authMode enableTelemetry consoleAssetsDir =
renderHtmlTemplate consoleTmplt $
-- variables required to render the template
A.object [ "isAdminSecretSet" J..= isAdminSecretSet authMode
, "consolePath" J..= consolePath
, "enableTelemetry" J..= boolToText enableTelemetry
, "cdnAssets" J..= boolToText (isNothing consoleAssetsDir)
, "assetsVersion" J..= consoleAssetsVersion
, "serverVersion" J..= currentVersion
A.object [ "isAdminSecretSet" A..= isAdminSecretSet authMode
, "consolePath" A..= consolePath
, "enableTelemetry" A..= boolToText enableTelemetry
, "cdnAssets" A..= boolToText (isNothing consoleAssetsDir)
, "assetsVersion" A..= consoleAssetsVersion
, "serverVersion" A..= currentVersion
]
where
consolePath = case path of

View File

@ -0,0 +1,63 @@
module Hasura.GC where
import Hasura.Prelude
import GHC.Stats
import Hasura.Logging
import System.Mem (performMajorGC)
import qualified Control.Concurrent.Extended as C
-- | The RTS's idle GC doesn't work for us:
--
-- - when `-I` is too low it may fire continuously causing scary high CPU
-- when idle among other issues (see #2565)
-- - when we set it higher it won't run at all leading to memory being
-- retained when idle (especially noticeable when users are benchmarking and
-- see memory stay high after finishing). In the theoretical worst case
-- there is such low haskell heap pressure that we never run finalizers to
-- free the foreign data from e.g. libpq.
-- - as of GHC 8.10.2 we have access to `-Iw`, but those two knobs still
-- dont give us a guarantee that a major GC will always run at some
-- minumum frequency (e.g. for finalizers)
--
-- ...so we hack together our own using GHC.Stats, which should have
-- insignificant runtime overhead.
ourIdleGC
:: Logger Hasura
-> DiffTime -- ^ Run a major GC when we've been "idle" for idleInterval
-> DiffTime -- ^ ...as long as it has been > minGCInterval time since the last major GC
-> DiffTime -- ^ Additionally, if it has been > maxNoGCInterval time, force a GC regardless.
-> IO void
ourIdleGC (Logger logger) idleInterval minGCInterval maxNoGCInterval =
startTimer >>= go 0 0
where
go gcs_prev major_gcs_prev timerSinceLastMajorGC = do
timeSinceLastGC <- timerSinceLastMajorGC
when (timeSinceLastGC < minGCInterval) $ do
-- no need to check idle until we've passed the minGCInterval:
C.sleep (minGCInterval - timeSinceLastGC)
RTSStats{gcs, major_gcs} <- getRTSStats
-- We use minor GCs as a proxy for "activity", which seems to work
-- well-enough (in tests it stays stable for a few seconds when we're
-- logically "idle" and otherwise increments quickly)
let areIdle = gcs == gcs_prev
areOverdue = timeSinceLastGC > maxNoGCInterval
-- a major GC was run since last iteration (cool!), reset timer:
if | major_gcs > major_gcs_prev -> do
startTimer >>= go gcs major_gcs
-- we are idle and its a good time to do a GC, or we're overdue and must run a GC:
| areIdle || areOverdue -> do
when (areOverdue && not areIdle) $
logger $ UnstructuredLog LevelWarn $
"Overdue for a major GC: forcing one even though we don't appear to be idle"
performMajorGC
startTimer >>= go (gcs+1) (major_gcs+1)
-- else keep the timer running, waiting for us to go idle:
| otherwise -> do
C.sleep idleInterval
go gcs major_gcs timerSinceLastMajorGC

View File

@ -29,6 +29,9 @@ module Hasura.Logging
import Hasura.Prelude
import Control.Monad.Trans.Managed (ManagedT(..), allocate)
import Control.Monad.Trans.Control
import qualified Control.AutoUpdate as Auto
import qualified Data.Aeson as J
import qualified Data.Aeson.Casing as J
@ -247,12 +250,15 @@ getFormattedTime tzM = do
-- format = Format.iso8601DateFormat (Just "%H:%M:%S")
mkLoggerCtx
:: LoggerSettings
:: (MonadIO io, MonadBaseControl IO io)
=> LoggerSettings
-> Set.HashSet (EngineLogType impl)
-> IO (LoggerCtx impl)
-> ManagedT io (LoggerCtx impl)
mkLoggerCtx (LoggerSettings cacheTime tzM logLevel) enabledLogs = do
loggerSet <- FL.newStdoutLoggerSet FL.defaultBufSize
timeGetter <- bool (return $ getFormattedTime tzM) cachedTimeGetter cacheTime
loggerSet <- allocate
(liftIO $ FL.newStdoutLoggerSet FL.defaultBufSize)
(liftIO . FL.rmLoggerSet)
timeGetter <- liftIO $ bool (return $ getFormattedTime tzM) cachedTimeGetter cacheTime
return $ LoggerCtx loggerSet logLevel timeGetter enabledLogs
where
cachedTimeGetter =

View File

@ -26,13 +26,14 @@ module Hasura.Server.Auth
import Hasura.Prelude
import qualified Control.Concurrent.Async.Lifted.Safe as LA
import qualified Crypto.Hash as Crypto
import qualified Data.Text.Encoding as T
import qualified Network.HTTP.Client as H
import qualified Network.HTTP.Types as N
import Control.Concurrent.Extended (forkImmortal)
import Control.Concurrent.Extended (ForkableMonadIO, forkManagedT)
import Control.Monad.Trans.Managed (ManagedT)
import Control.Monad.Morph (hoist)
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.IORef (newIORef)
import Data.Time.Clock (UTCTime)
@ -102,9 +103,7 @@ data AuthMode
-- This must only be run once, on launch.
setupAuthMode
:: ( HasVersion
, MonadIO m
, MonadBaseControl IO m
, LA.Forall (LA.Pure m)
, ForkableMonadIO m
, Tracing.HasReporter m
)
=> Maybe AdminSecretHash
@ -113,7 +112,7 @@ setupAuthMode
-> Maybe RoleName
-> H.Manager
-> Logger Hasura
-> ExceptT Text m AuthMode
-> ExceptT Text (ManagedT m) AuthMode
setupAuthMode mAdminSecretHash mWebHook mJwtSecret mUnAuthRole httpManager logger =
case (mAdminSecretHash, mWebHook, mJwtSecret) of
(Just hash, Nothing, Nothing) -> return $ AMAdminSecret hash mUnAuthRole
@ -148,13 +147,11 @@ setupAuthMode mAdminSecretHash mWebHook mJwtSecret mUnAuthRole httpManager logge
-- mkJwtCtx :: HasVersion => JWTConfig -> m JWTCtx
mkJwtCtx
:: ( HasVersion
, MonadIO m
, MonadBaseControl IO m
, LA.Forall (LA.Pure m)
, ForkableMonadIO m
, Tracing.HasReporter m
)
=> JWTConfig
-> ExceptT Text m JWTCtx
-> ExceptT Text (ManagedT m) JWTCtx
mkJwtCtx JWTConfig{..} = do
jwkRef <- case jcKeyOrUrl of
Left jwk -> liftIO $ newIORef (JWKSet [jwk])
@ -165,24 +162,22 @@ setupAuthMode mAdminSecretHash mWebHook mJwtSecret mUnAuthRole httpManager logge
-- header), do not start a background thread for refreshing the JWK
getJwkFromUrl url = do
ref <- liftIO $ newIORef $ JWKSet []
maybeExpiry <- withJwkError $ Tracing.runTraceT "jwk init" $ updateJwkRef logger httpManager url ref
maybeExpiry <- hoist lift $ withJwkError $ Tracing.runTraceT "jwk init" $ updateJwkRef logger httpManager url ref
case maybeExpiry of
Nothing -> return ref
Just time -> do
void . lift $ forkImmortal "jwkRefreshCtrl" logger $
void . lift $ forkManagedT "jwkRefreshCtrl" logger $
jwkRefreshCtrl logger httpManager url ref (convertDuration time)
return ref
withJwkError act = do
res <- runExceptT act
case res of
Right r -> return r
Left err -> case err of
-- when fetching JWK initially, except expiry parsing error, all errors are critical
JFEHttpException _ msg -> throwError msg
JFEHttpError _ _ _ e -> throwError e
JFEJwkParseError _ e -> throwError e
JFEExpiryParseError _ _ -> return Nothing
onLeft res $ \case
-- when fetching JWK initially, except expiry parsing error, all errors are critical
JFEHttpException _ msg -> throwError msg
JFEHttpError _ _ _ e -> throwError e
JFEJwkParseError _ e -> throwError e
JFEExpiryParseError _ _ -> return Nothing
getUserInfo
:: (HasVersion, MonadIO m, MonadBaseControl IO m, MonadError QErr m, Tracing.MonadTrace m)

View File

@ -19,6 +19,7 @@ import Hasura.Server.Logging
import Hasura.Server.Types (InstanceId (..))
import Hasura.Session
import Control.Monad.Trans.Managed (ManagedT)
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Aeson
import Data.Aeson.Casing
@ -161,18 +162,18 @@ if listen started after schema cache init start time.
-- | An async thread which listen to Postgres notify to enable schema syncing
-- See Note [Schema Cache Sync]
startSchemaSyncListenerThread
:: (MonadIO m)
:: C.ForkableMonadIO m
=> PG.PGPool
-> Logger Hasura
-> InstanceId
-> m (Immortal.Thread, SchemaSyncEventRef)
-> ManagedT m (Immortal.Thread, SchemaSyncEventRef)
startSchemaSyncListenerThread pool logger instanceId = do
-- only the latest event is recorded here
-- we don't want to store and process all the events, only the latest event
schemaSyncEventRef <- liftIO $ STM.newTVarIO Nothing
-- Start listener thread
listenerThread <- liftIO $ C.forkImmortal "SchemeUpdate.listener" logger $
listenerThread <- C.forkManagedT "SchemeUpdate.listener" logger . liftIO $
listener pool logger schemaSyncEventRef
logThreadStarted logger instanceId TTListener listenerThread
pure (listenerThread, schemaSyncEventRef)
@ -190,11 +191,11 @@ startSchemaSyncProcessorThread
-> InstanceId
-> UTC.UTCTime
-> RemoteSchemaPermsCtx
-> m Immortal.Thread
-> ManagedT m Immortal.Thread
startSchemaSyncProcessorThread sqlGenCtx pool logger httpMgr
schemaSyncEventRef cacheRef instanceId cacheInitStartTime remoteSchemaPermsCtx = do
-- Start processor thread
processorThread <- C.forkImmortal "SchemeUpdate.processor" logger $
processorThread <- C.forkManagedT "SchemeUpdate.processor" logger $
processor sqlGenCtx pool logger httpMgr schemaSyncEventRef cacheRef instanceId cacheInitStartTime remoteSchemaPermsCtx
logThreadStarted logger instanceId TTProcessor processorThread
pure processorThread

View File

@ -6,6 +6,7 @@ import Hasura.Logging
import Hasura.Prelude
import Hasura.Server.Version
import Control.Monad.Trans.Managed (lowerManagedT)
import Control.Monad.Trans.Control
import qualified Crypto.JOSE.JWK as Jose
import Data.Aeson ((.=))
@ -584,6 +585,7 @@ setupAuthMode' mAdminSecretHash mWebHook mJwtSecret mUnAuthRole =
-- just throw away the error message for ease of testing:
fmap (either (const $ Left ()) Right)
$ runNoReporter
$ lowerManagedT
$ runExceptT
$ setupAuthMode mAdminSecretHash mWebHook mJwtSecret mUnAuthRole
-- NOTE: this won't do any http or launch threads if we don't specify JWT URL: