Merge branch 'master' into 5363-default-bounded-plan-cache

This commit is contained in:
Brandon Simmons 2020-07-30 10:38:20 -04:00 committed by GitHub
commit 78b2251df8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -2,11 +2,12 @@
module Hasura.App where
import Control.Concurrent.STM.TVar (readTVarIO, TVar)
import Control.Concurrent.STM.TVar (TVar, readTVarIO)
import Control.Exception (throwIO)
import Control.Lens (view, _2)
import Control.Monad.Base
import Control.Monad.Catch (MonadCatch, MonadMask, MonadThrow, onException, Exception)
import Control.Monad.Catch (Exception, MonadCatch, MonadMask,
MonadThrow, onException)
import Control.Monad.Morph (hoist)
import Control.Monad.Stateless
import Control.Monad.STM (atomically)
@ -21,12 +22,13 @@ import System.Mem (performMajorGC)
import qualified Control.Concurrent.Async.Lifted.Safe as LA
import qualified Control.Concurrent.Extended as C
import qualified Control.Immortal as Immortal
import qualified Data.Aeson as A
import qualified Data.ByteString.Char8 as BC
import qualified Data.ByteString.Lazy.Char8 as BLC
import qualified Data.Environment as Env
import qualified Data.Set as Set
import qualified Data.Text as T
import qualified Data.Environment as Env
import qualified Data.Time.Clock as Clock
import qualified Data.Yaml as Y
import qualified Database.PG.Query as Q
@ -35,18 +37,17 @@ import qualified Network.HTTP.Client.TLS as HTTP
import qualified Network.Wai.Handler.Warp as Warp
import qualified System.Log.FastLogger as FL
import qualified Text.Mustache.Compile as M
import qualified Control.Immortal as Immortal
import Hasura.GraphQL.Transport.HTTP (MonadExecuteQuery(..))
import Hasura.Db
import Hasura.EncJSON
import Hasura.Eventing.EventTrigger
import Hasura.Eventing.Common
import Hasura.Eventing.EventTrigger
import Hasura.Eventing.ScheduledTrigger
import Hasura.GraphQL.Execute (MonadGQLExecutionCheck (..),
checkQueryInAllowlist)
import Hasura.GraphQL.Logging (MonadQueryLog (..), QueryLog (..))
import Hasura.GraphQL.Resolve.Action (asyncActionsProcessor)
import Hasura.GraphQL.Transport.HTTP (MonadExecuteQuery (..))
import Hasura.GraphQL.Transport.HTTP.Protocol (toParsed)
import Hasura.Logging
import Hasura.Prelude
@ -71,9 +72,9 @@ import Hasura.Server.Telemetry
import Hasura.Server.Version
import Hasura.Session
import qualified Hasura.Tracing as Tracing
import qualified Hasura.GraphQL.Transport.WebSocket.Server as WS
import qualified Hasura.GraphQL.Execute.LiveQuery.Poll as EL
import qualified Hasura.GraphQL.Transport.WebSocket.Server as WS
import qualified Hasura.Tracing as Tracing
data ExitCode
= InvalidEnvironmentVariableOptionsError
@ -365,14 +366,14 @@ runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp pos
fetchI = milliseconds $ fromMaybe (Milliseconds defaultFetchInterval) soEventsFetchInterval
logEnvHeaders = soLogHeadersFromEnv
lockedEventsCtx <- liftIO $ atomically $ initLockedEventsCtx
lockedEventsCtx <- liftIO $ atomically initLockedEventsCtx
-- prepare event triggers data
prepareEvents _icPgPool logger
eventEngineCtx <- liftIO $ atomically $ initEventEngineCtx maxEvThrds fetchI
unLogger logger $ mkGenericStrLog LevelInfo "event_triggers" "starting workers"
_eventQueueThread <- C.forkImmortal "processEventQueue" logger $
eventQueueThread <- C.forkImmortal "processEventQueue" logger $
processEventQueue logger logEnvHeaders
_icHttpManager _icPgPool (getSCFromRef cacheRef) eventEngineCtx lockedEventsCtx
@ -381,35 +382,42 @@ runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp pos
asyncActionsProcessor env logger (_scrCache cacheRef) _icPgPool _icHttpManager
-- start a background thread to create new cron events
void $ liftIO $ C.forkImmortal "runCronEventsGenerator" logger $
cronEventsThread <- liftIO $ C.forkImmortal "runCronEventsGenerator" logger $
runCronEventsGenerator logger _icPgPool (getSCFromRef cacheRef)
-- prepare scheduled triggers
prepareScheduledEvents _icPgPool logger
-- start a background thread to deliver the scheduled events
void $ C.forkImmortal "processScheduledTriggers" logger $
scheduledEventsThread <- C.forkImmortal "processScheduledTriggers" logger $
processScheduledTriggers env logger logEnvHeaders _icHttpManager _icPgPool (getSCFromRef cacheRef) lockedEventsCtx
-- start a background thread to check for updates
updateThread <- C.forkImmortal "checkForUpdates" logger $ liftIO $
checkForUpdates loggerCtx _icHttpManager
-- startTelemetry logger serveOpts cacheRef initCtx
-- start a background thread for telemetry
when soEnableTelemetry $ do
telemetryThread <- if soEnableTelemetry
then do
unLogger logger $ mkGenericStrLog LevelInfo "telemetry" telemetryNotice
(dbId, pgVersion) <- liftIO $ runTxIO _icPgPool (Q.ReadCommitted, Nothing) $
(,) <$> getDbId <*> getPgVersion
void $ C.forkImmortal "runTelemetry" logger $ liftIO $
telemetryThread <- C.forkImmortal "runTelemetry" logger $ liftIO $
runTelemetry logger _icHttpManager (getSCFromRef cacheRef) dbId _icInstanceId pgVersion
return $ Just telemetryThread
else return Nothing
-- events has its own shutdown mechanism, used in 'shutdownHandler'
let immortalThreads = [schemaSyncListenerThread, schemaSyncProcessorThread, updateThread, asyncActionsThread]
-- all the immortal threads are collected so that they can be stopped when gracefully shutting down
let immortalThreads = [ schemaSyncListenerThread
, schemaSyncProcessorThread
, updateThread
, asyncActionsThread
, eventQueueThread
, scheduledEventsThread
, cronEventsThread
] <> maybe [] pure telemetryThread
finishTime <- liftIO Clock.getCurrentTime
let apiInitTime = realToFrac $ Clock.diffUTCTime finishTime initTime
@ -473,7 +481,7 @@ runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp pos
-> IO ()
unlockEventsForShutdown pool (Logger logger) triggerType eventType doUnlock lockedIdsVar = do
lockedIds <- readTVarIO lockedIdsVar
unless (Set.null lockedIds) do
unless (Set.null lockedIds) $ do
result <- runTx pool (Q.ReadCommitted, Nothing) (doUnlock $ toList lockedIds)
case result of
Left err -> logger $ mkGenericStrLog LevelWarn triggerType $