graphql-engine/server/src-lib/Hasura/App.hs
2023-02-08 03:36:45 +00:00

1311 lines
50 KiB
Haskell

{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE ViewPatterns #-}
-- | Imported by 'server/src-exec/Main.hs'.
module Hasura.App
( ExitCode (AuthConfigurationError, DatabaseMigrationError, DowngradeProcessError, MetadataCleanError, MetadataExportError, SchemaCacheInitError),
ExitException (ExitException),
GlobalCtx (..),
PGMetadataStorageAppT (runPGMetadataStorageAppT),
accessDeniedErrMsg,
flushLogger,
getCatalogStateTx,
initGlobalCtx,
initAuthMode,
initialiseServerCtx,
initSubscriptionsState,
migrateCatalogSchema,
mkLoggers,
mkPGLogger,
notifySchemaCacheSyncTx,
parseArgs,
throwErrExit,
throwErrJExit,
printJSON,
printYaml,
readTlsAllowlist,
resolvePostgresConnInfo,
runHGEServer,
setCatalogStateTx,
-- * Exported for testing
mkHGEServer,
mkPgSourceResolver,
mkMSSQLSourceResolver,
)
where
import Control.Concurrent.Async.Lifted.Safe qualified as LA
import Control.Concurrent.Extended qualified as C
import Control.Concurrent.STM qualified as STM
import Control.Concurrent.STM.TVar (readTVarIO)
import Control.Exception (bracket_, throwIO)
import Control.Monad.Catch
( Exception,
MonadCatch,
MonadMask,
MonadThrow,
onException,
)
import Control.Monad.Morph (hoist)
import Control.Monad.STM (atomically)
import Control.Monad.Stateless
import Control.Monad.Trans.Control (MonadBaseControl (..))
import Control.Monad.Trans.Managed (ManagedT (..), allocate, allocate_)
import Control.Retry qualified as Retry
import Data.Aeson qualified as A
import Data.ByteString.Char8 qualified as BC
import Data.ByteString.Lazy qualified as BL
import Data.ByteString.Lazy.Char8 qualified as BLC
import Data.Environment qualified as Env
import Data.FileEmbed (makeRelativeToProject)
import Data.HashMap.Strict qualified as HM
import Data.Set.NonEmpty qualified as NE
import Data.Text qualified as T
import Data.Time.Clock (UTCTime)
import Data.Time.Clock qualified as Clock
import Data.Yaml qualified as Y
import Database.MSSQL.Pool qualified as MSPool
import Database.PG.Query qualified as PG
import Database.PG.Query qualified as Q
import GHC.AssertNF.CPP
import Hasura.Backends.MSSQL.Connection
import Hasura.Backends.Postgres.Connection
import Hasura.Base.Error
import Hasura.Eventing.Common
import Hasura.Eventing.EventTrigger
import Hasura.Eventing.ScheduledTrigger
import Hasura.GraphQL.Execute
( ExecutionStep (..),
MonadGQLExecutionCheck (..),
checkQueryInAllowlist,
)
import Hasura.GraphQL.Execute.Action
import Hasura.GraphQL.Execute.Action.Subscription
import Hasura.GraphQL.Execute.Backend qualified as EB
import Hasura.GraphQL.Execute.Subscription.Poll qualified as ES
import Hasura.GraphQL.Execute.Subscription.State qualified as ES
import Hasura.GraphQL.Logging (MonadQueryLog (..))
import Hasura.GraphQL.Schema.Options qualified as Options
import Hasura.GraphQL.Transport.HTTP
( CacheStoreSuccess (CacheStoreSkipped),
MonadExecuteQuery (..),
)
import Hasura.GraphQL.Transport.HTTP.Protocol (toParsed)
import Hasura.GraphQL.Transport.WebSocket.Server qualified as WS
import Hasura.Logging
import Hasura.Metadata.Class
import Hasura.PingSources
import Hasura.Prelude
import Hasura.QueryTags
import Hasura.RQL.DDL.EventTrigger (MonadEventLogCleanup (..))
import Hasura.RQL.DDL.Schema.Cache
import Hasura.RQL.DDL.Schema.Cache.Common
import Hasura.RQL.DDL.Schema.Catalog
import Hasura.RQL.Types.Allowlist
import Hasura.RQL.Types.Backend
import Hasura.RQL.Types.Common
import Hasura.RQL.Types.Eventing.Backend
import Hasura.RQL.Types.Metadata
import Hasura.RQL.Types.Network
import Hasura.RQL.Types.ResizePool
import Hasura.RQL.Types.SchemaCache
import Hasura.RQL.Types.SchemaCache.Build
import Hasura.RQL.Types.Source
import Hasura.SQL.AnyBackend qualified as AB
import Hasura.SQL.Backend
import Hasura.Server.API.Query (requiresAdmin)
import Hasura.Server.App
import Hasura.Server.Auth
import Hasura.Server.CheckUpdates (checkForUpdates)
import Hasura.Server.Init hiding (checkFeatureFlag)
import Hasura.Server.Limits
import Hasura.Server.Logging
import Hasura.Server.Metrics (ServerMetrics (..))
import Hasura.Server.Migrate (migrateCatalog)
import Hasura.Server.Prometheus
( PrometheusMetrics (..),
decWarpThreads,
incWarpThreads,
)
import Hasura.Server.SchemaCacheRef
( SchemaCacheRef,
getSchemaCache,
initialiseSchemaCacheRef,
logInconsistentMetadata,
)
import Hasura.Server.SchemaUpdate
import Hasura.Server.Telemetry
import Hasura.Server.Types
import Hasura.Server.Version
import Hasura.Session
import Hasura.ShutdownLatch
import Hasura.Tracing qualified as Tracing
import Network.HTTP.Client qualified as HTTP
import Network.HTTP.Client.Blocklisting (Blocklist)
import Network.HTTP.Client.CreateManager (mkHttpManager)
import Network.HTTP.Client.Manager (HasHttpManagerM (..))
import Network.Wai (Application)
import Network.Wai.Handler.Warp qualified as Warp
import Options.Applicative
import Refined (unrefine)
import System.Environment (getEnvironment)
import System.Log.FastLogger qualified as FL
import System.Metrics qualified as EKG
import System.Metrics.Gauge qualified as EKG.Gauge
import Text.Mustache.Compile qualified as M
import Web.Spock.Core qualified as Spock
data ExitCode
= -- these are used during server initialization:
InvalidEnvironmentVariableOptionsError
| InvalidDatabaseConnectionParamsError
| AuthConfigurationError
| EventSubSystemError
| DatabaseMigrationError
| -- | used by MT because it initialises the schema cache only
-- these are used in app/Main.hs:
SchemaCacheInitError
| MetadataExportError
| MetadataCleanError
| ExecuteProcessError
| DowngradeProcessError
deriving (Show)
data ExitException = ExitException
{ eeCode :: !ExitCode,
eeMessage :: !BC.ByteString
}
deriving (Show)
instance Exception ExitException
throwErrExit :: (MonadIO m) => forall a. ExitCode -> String -> m a
throwErrExit reason = liftIO . throwIO . ExitException reason . BC.pack
throwErrJExit :: (A.ToJSON a, MonadIO m) => forall b. ExitCode -> a -> m b
throwErrJExit reason = liftIO . throwIO . ExitException reason . BLC.toStrict . A.encode
--------------------------------------------------------------------------------
-- TODO(SOLOMON): Move Into `Hasura.Server.Init`. Unable to do so
-- currently due `throwErrExit`.
-- | Parse cli arguments to graphql-engine executable.
parseArgs :: EnabledLogTypes impl => IO (HGEOptions (ServeOptions impl))
parseArgs = do
rawHGEOpts <- execParser opts
env <- getEnvironment
let eitherOpts = runWithEnv env $ mkHGEOptions rawHGEOpts
onLeft eitherOpts $ throwErrExit InvalidEnvironmentVariableOptionsError
where
opts =
info
(helper <*> parseHgeOpts)
( fullDesc
<> header "Hasura GraphQL Engine: Blazing fast, instant realtime GraphQL APIs on your DB with fine grained access control, also trigger webhooks on database events."
<> footerDoc (Just mainCmdFooter)
)
--------------------------------------------------------------------------------
printJSON :: (A.ToJSON a, MonadIO m) => a -> m ()
printJSON = liftIO . BLC.putStrLn . A.encode
printYaml :: (A.ToJSON a, MonadIO m) => a -> m ()
printYaml = liftIO . BC.putStrLn . Y.encode
mkPGLogger :: Logger Hasura -> PG.PGLogger
mkPGLogger (Logger logger) (PG.PLERetryMsg msg) =
logger $ PGLog LevelWarn msg
-- | Context required for all graphql-engine CLI commands
data GlobalCtx = GlobalCtx
{ _gcMetadataDbConnInfo :: !PG.ConnInfo,
-- | --database-url option, @'UrlConf' is required to construct default source configuration
-- and optional retries
_gcDefaultPostgresConnInfo :: !(Maybe (UrlConf, PG.ConnInfo), Maybe Int)
}
readTlsAllowlist :: SchemaCacheRef -> IO [TlsAllow]
readTlsAllowlist scRef = scTlsAllowlist <$> getSchemaCache scRef
initGlobalCtx ::
(MonadIO m) =>
Env.Environment ->
-- | the metadata DB URL
Maybe String ->
-- | the user's DB URL
PostgresConnInfo (Maybe UrlConf) ->
m GlobalCtx
initGlobalCtx env metadataDbUrl defaultPgConnInfo = do
let PostgresConnInfo dbUrlConf maybeRetries = defaultPgConnInfo
mkConnInfoFromSource dbUrl = do
resolvePostgresConnInfo env dbUrl maybeRetries
mkConnInfoFromMDb mdbUrl =
let retries = fromMaybe 1 maybeRetries
in (PG.ConnInfo retries . PG.CDDatabaseURI . txtToBs . T.pack) mdbUrl
mkGlobalCtx mdbConnInfo sourceConnInfo =
pure $ GlobalCtx mdbConnInfo (sourceConnInfo, maybeRetries)
case (metadataDbUrl, dbUrlConf) of
(Nothing, Nothing) ->
throwErrExit
InvalidDatabaseConnectionParamsError
"Fatal Error: Either of --metadata-database-url or --database-url option expected"
-- If no metadata storage specified consider use default database as
-- metadata storage
(Nothing, Just dbUrl) -> do
connInfo <- mkConnInfoFromSource dbUrl
mkGlobalCtx connInfo $ Just (dbUrl, connInfo)
(Just mdUrl, Nothing) -> do
let mdConnInfo = mkConnInfoFromMDb mdUrl
mkGlobalCtx mdConnInfo Nothing
(Just mdUrl, Just dbUrl) -> do
srcConnInfo <- mkConnInfoFromSource dbUrl
let mdConnInfo = mkConnInfoFromMDb mdUrl
mkGlobalCtx mdConnInfo (Just (dbUrl, srcConnInfo))
-- | An application with Postgres database as a metadata storage
newtype PGMetadataStorageAppT m a = PGMetadataStorageAppT {runPGMetadataStorageAppT :: (PG.PGPool, PG.PGLogger) -> m a}
deriving
( Functor,
Applicative,
Monad,
MonadIO,
MonadFix,
MonadCatch,
MonadThrow,
MonadMask,
HasHttpManagerM,
HasServerConfigCtx,
MonadReader (PG.PGPool, PG.PGLogger),
MonadBase b,
MonadBaseControl b
)
via (ReaderT (PG.PGPool, PG.PGLogger) m)
deriving
( MonadTrans
)
via (ReaderT (PG.PGPool, PG.PGLogger))
resolvePostgresConnInfo ::
(MonadIO m) => Env.Environment -> UrlConf -> Maybe Int -> m PG.ConnInfo
resolvePostgresConnInfo env dbUrlConf maybeRetries = do
dbUrlText <-
runExcept (resolveUrlConf env dbUrlConf) `onLeft` \err ->
liftIO (throwErrJExit InvalidDatabaseConnectionParamsError err)
pure $ PG.ConnInfo retries $ PG.CDDatabaseURI $ txtToBs dbUrlText
where
retries = fromMaybe 1 maybeRetries
initAuthMode ::
(C.ForkableMonadIO m, Tracing.HasReporter m) =>
ServeOptions impl ->
HTTP.Manager ->
Logger Hasura ->
m AuthMode
initAuthMode ServeOptions {..} httpManager logger = do
authModeRes <-
runExceptT $
setupAuthMode
soAdminSecret
soAuthHook
soJwtSecret
soUnAuthRole
logger
httpManager
authMode <- onLeft authModeRes (throwErrExit AuthConfigurationError . T.unpack)
-- forking a dedicated polling thread to dynamically get the latest JWK settings
-- set by the user and update the JWK accordingly. This will help in applying the
-- updates without restarting HGE.
_ <- C.forkImmortal "update JWK" logger $ updateJwkCtx authMode httpManager logger
return authMode
initSubscriptionsState ::
ServeOptions impl ->
Logger Hasura ->
Maybe ES.SubscriptionPostPollHook ->
IO ES.SubscriptionsState
initSubscriptionsState ServeOptions {..} logger liveQueryHook = ES.initSubscriptionsState soLiveQueryOpts soStreamingQueryOpts postPollHook
where
postPollHook = fromMaybe (ES.defaultSubscriptionPostPollHook logger) liveQueryHook
-- | Initializes or migrates the catalog and returns the context required to start the server.
initialiseServerCtx ::
(C.ForkableMonadIO m, MonadCatch m) =>
Env.Environment ->
GlobalCtx ->
ServeOptions Hasura ->
Maybe ES.SubscriptionPostPollHook ->
ServerMetrics ->
PrometheusMetrics ->
Tracing.SamplingPolicy ->
(FeatureFlag -> IO Bool) ->
ManagedT m ServerCtx
initialiseServerCtx env GlobalCtx {..} serveOptions@ServeOptions {..} liveQueryHook serverMetrics prometheusMetrics traceSamplingPolicy checkFeatureFlag = do
instanceId <- liftIO generateInstanceId
latch <- liftIO newShutdownLatch
loggers@(Loggers loggerCtx logger pgLogger) <- mkLoggers soEnabledLogTypes soLogLevel
when (null soAdminSecret) $ do
let errMsg :: Text
errMsg = "WARNING: No admin secret provided"
unLogger logger $
StartupLog
{ slLogLevel = LevelWarn,
slKind = "no_admin_secret",
slInfo = A.toJSON errMsg
}
-- log serve options
unLogger logger $ serveOptsToLog serveOptions
-- log postgres connection info
unLogger logger $ connInfoToLog _gcMetadataDbConnInfo
metadataDbPool <-
allocate
(liftIO $ PG.initPGPool _gcMetadataDbConnInfo soConnParams pgLogger)
(liftIO . PG.destroyPGPool)
let maybeDefaultSourceConfig =
fst _gcDefaultPostgresConnInfo <&> \(dbUrlConf, _) ->
let connSettings =
PostgresPoolSettings
{ _ppsMaxConnections = Just $ Q.cpConns soConnParams,
_ppsTotalMaxConnections = Nothing,
_ppsIdleTimeout = Just $ Q.cpIdleTime soConnParams,
_ppsRetries = snd _gcDefaultPostgresConnInfo <|> Just 1,
_ppsPoolTimeout = PG.cpTimeout soConnParams,
_ppsConnectionLifetime = PG.cpMbLifetime soConnParams
}
sourceConnInfo = PostgresSourceConnInfo dbUrlConf (Just connSettings) (PG.cpAllowPrepare soConnParams) soTxIso Nothing
in PostgresConnConfiguration sourceConnInfo Nothing defaultPostgresExtensionsSchema Nothing mempty
optimizePermissionFilters
| EFOptimizePermissionFilters `elem` soExperimentalFeatures = Options.OptimizePermissionFilters
| otherwise = Options.Don'tOptimizePermissionFilters
bigqueryStringNumericInput
| EFBigQueryStringNumericInput `elem` soExperimentalFeatures = Options.EnableBigQueryStringNumericInput
| otherwise = Options.DisableBigQueryStringNumericInput
sqlGenCtx = SQLGenCtx soStringifyNum soDangerousBooleanCollapse optimizePermissionFilters bigqueryStringNumericInput
let serverConfigCtx =
ServerConfigCtx
soInferFunctionPermissions
soEnableRemoteSchemaPermissions
sqlGenCtx
soEnableMaintenanceMode
soExperimentalFeatures
soEventingMode
soReadOnlyMode
soDefaultNamingConvention
soMetadataDefaults
checkFeatureFlag
rebuildableSchemaCache <-
lift . flip onException (flushLogger loggerCtx) $
migrateCatalogSchema
env
logger
metadataDbPool
maybeDefaultSourceConfig
mempty
serverConfigCtx
(mkPgSourceResolver pgLogger)
mkMSSQLSourceResolver
soExtensionsSchema
-- Start a background thread for listening schema sync events from other server instances,
metaVersionRef <- liftIO $ STM.newEmptyTMVarIO
-- An interval of 0 indicates that no schema sync is required
case soSchemaPollInterval of
Skip -> unLogger logger $ mkGenericLog @Text LevelInfo "schema-sync" "Schema sync disabled"
Interval interval -> do
unLogger logger $ mkGenericLog @String LevelInfo "schema-sync" ("Schema sync enabled. Polling at " <> show interval)
void $ startSchemaSyncListenerThread logger metadataDbPool instanceId interval metaVersionRef
schemaCacheRef <- initialiseSchemaCacheRef serverMetrics rebuildableSchemaCache
srvMgr <- liftIO $ mkHttpManager (readTlsAllowlist schemaCacheRef) mempty
authMode <- liftIO $ initAuthMode serveOptions srvMgr logger
subscriptionsState <- liftIO $ initSubscriptionsState serveOptions logger liveQueryHook
pure $
ServerCtx
{ scLoggers = loggers,
scCacheRef = schemaCacheRef,
scAuthMode = authMode,
scManager = srvMgr,
scSQLGenCtx = sqlGenCtx,
scEnabledAPIs = soEnabledAPIs,
scInstanceId = instanceId,
scSubscriptionState = subscriptionsState,
scEnableAllowList = soEnableAllowList,
scEnvironment = env,
scResponseInternalErrorsConfig = soResponseInternalErrorsConfig,
scRemoteSchemaPermsCtx = soEnableRemoteSchemaPermissions,
scFunctionPermsCtx = soInferFunctionPermissions,
scEnableMaintenanceMode = soEnableMaintenanceMode,
scExperimentalFeatures = soExperimentalFeatures,
scLoggingSettings = LoggingSettings soEnabledLogTypes soEnableMetadataQueryLogging,
scEventingMode = soEventingMode,
scEnableReadOnlyMode = soReadOnlyMode,
scDefaultNamingConvention = soDefaultNamingConvention,
scServerMetrics = serverMetrics,
scMetadataDefaults = soMetadataDefaults,
scEnabledLogTypes = soEnabledLogTypes,
scMetadataDbPool = metadataDbPool,
scShutdownLatch = latch,
scMetaVersionRef = metaVersionRef,
scPrometheusMetrics = prometheusMetrics,
scTraceSamplingPolicy = traceSamplingPolicy,
scCheckFeatureFlag = checkFeatureFlag
}
mkLoggers ::
(MonadIO m, MonadBaseControl IO m) =>
HashSet (EngineLogType Hasura) ->
LogLevel ->
ManagedT m Loggers
mkLoggers enabledLogs logLevel = do
loggerCtx <- mkLoggerCtx (defaultLoggerSettings True logLevel) enabledLogs
let logger = mkLogger loggerCtx
pgLogger = mkPGLogger logger
return $ Loggers loggerCtx logger pgLogger
-- | helper function to initialize or migrate the @hdb_catalog@ schema (used by pro as well)
migrateCatalogSchema ::
(MonadIO m, MonadBaseControl IO m) =>
Env.Environment ->
Logger Hasura ->
PG.PGPool ->
Maybe (SourceConnConfiguration ('Postgres 'Vanilla)) ->
Blocklist ->
ServerConfigCtx ->
SourceResolver ('Postgres 'Vanilla) ->
SourceResolver ('MSSQL) ->
ExtensionsSchema ->
m RebuildableSchemaCache
migrateCatalogSchema
env
logger
pool
defaultSourceConfig
blockList
serverConfigCtx
pgSourceResolver
mssqlSourceResolver
extensionsSchema = do
initialiseResult <- runExceptT $ do
-- TODO: should we allow the migration to happen during maintenance mode?
-- Allowing this can be a sanity check, to see if the hdb_catalog in the
-- DB has been set correctly
currentTime <- liftIO Clock.getCurrentTime
(migrationResult, metadata) <-
PG.runTx pool (PG.Serializable, Just PG.ReadWrite) $
migrateCatalog
defaultSourceConfig
extensionsSchema
(_sccMaintenanceMode serverConfigCtx)
currentTime
let tlsAllowList = networkTlsAllowlist $ _metaNetwork metadata
httpManager <- liftIO $ mkHttpManager (pure tlsAllowList) blockList
let cacheBuildParams =
CacheBuildParams httpManager pgSourceResolver mssqlSourceResolver serverConfigCtx
buildReason = CatalogSync
schemaCache <-
runCacheBuild cacheBuildParams $
buildRebuildableSchemaCacheWithReason buildReason logger env metadata
pure (migrationResult, schemaCache)
(migrationResult, schemaCache) <-
initialiseResult `onLeft` \err -> do
unLogger
logger
StartupLog
{ slLogLevel = LevelError,
slKind = "catalog_migrate",
slInfo = A.toJSON err
}
liftIO (throwErrJExit DatabaseMigrationError err)
unLogger logger migrationResult
pure schemaCache
-- | Event triggers live in the user's DB and other events
-- (cron, one-off and async actions)
-- live in the metadata DB, so we need a way to differentiate the
-- type of shutdown action
data ShutdownAction
= EventTriggerShutdownAction (IO ())
| MetadataDBShutdownAction (ExceptT QErr IO ())
-- | If an exception is encountered , flush the log buffer and
-- rethrow If we do not flush the log buffer on exception, then log lines
-- may be missed
-- See: https://github.com/hasura/graphql-engine/issues/4772
flushLogger :: MonadIO m => LoggerCtx impl -> m ()
flushLogger = liftIO . FL.flushLogStr . _lcLoggerSet
-- | This function acts as the entrypoint for the graphql-engine webserver.
--
-- Note: at the exit of this function, or in case of a graceful server shutdown
-- (SIGTERM, or more generally, whenever the shutdown latch is set), we need to
-- make absolutely sure that we clean up any resources which were allocated during
-- server setup. In the case of a multitenant process, failure to do so can lead to
-- resource leaks.
--
-- To track these resources, we use the ManagedT monad, and attach finalizers at
-- the same point in the code where we allocate resources. If you fork a new
-- long-lived thread, or create a connection pool, or allocate any other
-- long-lived resource, make sure to pair the allocator with its finalizer.
-- There are plenty of examples throughout the code. For example, see
-- 'C.forkManagedT'.
--
-- Note also: the order in which the finalizers run can be important. Specifically,
-- we want the finalizers for the logger threads to run last, so that we retain as
-- many "thread stopping" log messages as possible. The order in which the
-- finalizers is run is determined by the order in which they are introduced in the
-- code.
{- HLINT ignore runHGEServer "Avoid lambda" -}
runHGEServer ::
forall m impl.
( MonadIO m,
MonadFix m,
MonadMask m,
MonadStateless IO m,
LA.Forall (LA.Pure m),
UserAuthentication (Tracing.TraceT m),
HttpLog m,
ConsoleRenderer m,
MonadVersionAPIWithExtraData m,
MonadMetadataApiAuthorization m,
MonadGQLExecutionCheck m,
MonadConfigApiHandler m,
MonadQueryLog m,
WS.MonadWSLog m,
MonadExecuteQuery m,
Tracing.HasReporter m,
HasResourceLimits m,
MonadMetadataStorageQueryAPI m,
MonadResolveSource m,
EB.MonadQueryTags m,
MonadEventLogCleanup m
) =>
(ServerCtx -> Spock.SpockT m ()) ->
Env.Environment ->
ServeOptions impl ->
ServerCtx ->
-- | start time
UTCTime ->
-- | A hook which can be called to indicate when the server is started succesfully
Maybe (IO ()) ->
EKG.Store EKG.EmptyMetrics ->
(FeatureFlag -> IO Bool) ->
ManagedT m ()
runHGEServer setupHook env serveOptions serverCtx@ServerCtx {..} initTime startupStatusHook ekgStore checkFeatureFlag = do
waiApplication <-
mkHGEServer setupHook env serveOptions serverCtx ekgStore checkFeatureFlag
let logger = _lsLogger $ scLoggers
-- `startupStatusHook`: add `Service started successfully` message to config_status
-- table when a tenant starts up in multitenant
let warpSettings :: Warp.Settings
warpSettings =
Warp.setPort (_getPort $ soPort serveOptions)
. Warp.setHost (soHost serveOptions)
. Warp.setGracefulShutdownTimeout (Just 30) -- 30s graceful shutdown
. Warp.setInstallShutdownHandler shutdownHandler
. Warp.setBeforeMainLoop (for_ startupStatusHook id)
. setForkIOWithMetrics
$ Warp.defaultSettings
setForkIOWithMetrics :: Warp.Settings -> Warp.Settings
setForkIOWithMetrics = Warp.setFork \f -> do
void $
C.forkIOWithUnmask
( \unmask ->
bracket_
( do
EKG.Gauge.inc (smWarpThreads scServerMetrics)
incWarpThreads (pmConnections scPrometheusMetrics)
)
( do
EKG.Gauge.dec (smWarpThreads scServerMetrics)
decWarpThreads (pmConnections scPrometheusMetrics)
)
(f unmask)
)
shutdownHandler :: IO () -> IO ()
shutdownHandler closeSocket =
LA.link =<< LA.async do
waitForShutdown $ scShutdownLatch
unLogger logger $ mkGenericLog @Text LevelInfo "server" "gracefully shutting down server"
closeSocket
finishTime <- liftIO Clock.getCurrentTime
let apiInitTime = realToFrac $ Clock.diffUTCTime finishTime initTime
unLogger logger $
mkGenericLog LevelInfo "server" $
StartupTimeInfo "starting API server" apiInitTime
-- Here we block until the shutdown latch 'MVar' is filled, and then
-- shut down the server. Once this blocking call returns, we'll tidy up
-- any resources using the finalizers attached using 'ManagedT' above.
-- Structuring things using the shutdown latch in this way lets us decide
-- elsewhere exactly how we want to control shutdown.
liftIO $ Warp.runSettings warpSettings waiApplication
-- | Part of a factorization of 'runHGEServer' to expose the constructed WAI
-- application for testing purposes. See 'runHGEServer' for documentation.
mkHGEServer ::
forall m impl.
( MonadIO m,
MonadFix m,
MonadMask m,
MonadStateless IO m,
LA.Forall (LA.Pure m),
UserAuthentication (Tracing.TraceT m),
HttpLog m,
ConsoleRenderer m,
MonadVersionAPIWithExtraData m,
MonadMetadataApiAuthorization m,
MonadGQLExecutionCheck m,
MonadConfigApiHandler m,
MonadQueryLog m,
WS.MonadWSLog m,
MonadExecuteQuery m,
Tracing.HasReporter m,
HasResourceLimits m,
MonadMetadataStorageQueryAPI m,
MonadResolveSource m,
EB.MonadQueryTags m,
MonadEventLogCleanup m
) =>
(ServerCtx -> Spock.SpockT m ()) ->
Env.Environment ->
ServeOptions impl ->
ServerCtx ->
EKG.Store EKG.EmptyMetrics ->
(FeatureFlag -> IO Bool) ->
ManagedT m Application
mkHGEServer setupHook env ServeOptions {..} serverCtx@ServerCtx {..} ekgStore checkFeatureFlag = do
-- Comment this to enable expensive assertions from "GHC.AssertNF". These
-- will log lines to STDOUT containing "not in normal form". In the future we
-- could try to integrate this into our tests. For now this is a development
-- tool.
--
-- NOTE: be sure to compile WITHOUT code coverage, for this to work properly.
liftIO disableAssertNF
let optimizePermissionFilters
| EFOptimizePermissionFilters `elem` soExperimentalFeatures = Options.OptimizePermissionFilters
| otherwise = Options.Don'tOptimizePermissionFilters
bigqueryStringNumericInput
| EFBigQueryStringNumericInput `elem` soExperimentalFeatures = Options.EnableBigQueryStringNumericInput
| otherwise = Options.DisableBigQueryStringNumericInput
sqlGenCtx = SQLGenCtx soStringifyNum soDangerousBooleanCollapse optimizePermissionFilters bigqueryStringNumericInput
Loggers loggerCtx logger _ = scLoggers
HasuraApp app cacheRef actionSubState stopWsServer <-
lift $
flip onException (flushLogger loggerCtx) $
mkWaiApp
setupHook
env
soCorsConfig
soConsoleStatus
soConsoleAssetsDir
soConsoleSentryDsn
soEnableTelemetry
scCacheRef
soConnectionOptions
soWebSocketKeepAlive
scEnabledLogTypes
serverCtx
soWebSocketConnectionInitTimeout
ekgStore
-- Init ServerConfigCtx
let serverConfigCtx =
ServerConfigCtx
soInferFunctionPermissions
soEnableRemoteSchemaPermissions
sqlGenCtx
soEnableMaintenanceMode
soExperimentalFeatures
soEventingMode
soReadOnlyMode
soDefaultNamingConvention
soMetadataDefaults
checkFeatureFlag
-- Log Warning if deprecated environment variables are used
sources <- scSources <$> liftIO (getSchemaCache cacheRef)
liftIO $ logDeprecatedEnvVars logger env sources
-- log inconsistent schema objects
inconsObjs <- scInconsistentObjs <$> liftIO (getSchemaCache cacheRef)
liftIO $ logInconsistentMetadata logger inconsObjs
-- NOTE: `newLogTVar` is being used to make sure that the metadata logger runs only once
-- while logging errors or any `inconsistent_metadata` logs.
newLogTVar <- liftIO $ STM.newTVarIO False
-- Start a background thread for processing schema sync event present in the '_sscSyncEventRef'
_ <-
startSchemaSyncProcessorThread
logger
scManager
scMetaVersionRef
cacheRef
scInstanceId
serverConfigCtx
newLogTVar
lockedEventsCtx <-
liftIO $
LockedEventsCtx
<$> STM.newTVarIO mempty
<*> STM.newTVarIO mempty
<*> STM.newTVarIO mempty
<*> STM.newTVarIO mempty
case soEventingMode of
EventingEnabled -> do
startEventTriggerPollerThread logger lockedEventsCtx cacheRef
startAsyncActionsPollerThread logger lockedEventsCtx cacheRef actionSubState
-- Create logger for logging the statistics of fetched cron triggers
fetchedCronTriggerStatsLogger <-
allocate
(createFetchedCronTriggerStatsLogger logger)
(closeFetchedCronTriggersStatsLogger logger)
-- start a background thread to create new cron events
_cronEventsThread <-
C.forkManagedT "runCronEventsGenerator" logger $
runCronEventsGenerator logger fetchedCronTriggerStatsLogger (getSchemaCache cacheRef)
startScheduledEventsPollerThread logger lockedEventsCtx cacheRef
EventingDisabled ->
unLogger logger $ mkGenericLog @Text LevelInfo "server" "starting in eventing disabled mode"
-- start a background thread to check for updates
_updateThread <-
C.forkManagedT "checkForUpdates" logger $
liftIO $
checkForUpdates loggerCtx scManager
-- Start a background thread for source pings
_sourcePingPoller <-
C.forkManagedT "sourcePingPoller" logger $ do
let pingLog =
unLogger logger . mkGenericLog @String LevelInfo "sources-ping"
liftIO
( runPingSources
pingLog
(scSourcePingConfig <$> getSchemaCache cacheRef)
)
-- start a background thread for telemetry
_telemetryThread <-
if isTelemetryEnabled soEnableTelemetry
then do
lift . unLogger logger $ mkGenericLog @Text LevelInfo "telemetry" telemetryNotice
dbUid <-
getMetadataDbUid `onLeftM` throwErrJExit DatabaseMigrationError
pgVersion <-
liftIO (runExceptT $ PG.runTx scMetadataDbPool (PG.ReadCommitted, Nothing) $ getPgVersion)
`onLeftM` throwErrJExit DatabaseMigrationError
telemetryThread <-
C.forkManagedT "runTelemetry" logger $
liftIO $
runTelemetry logger scManager (getSchemaCache cacheRef) dbUid scInstanceId pgVersion soExperimentalFeatures
return $ Just telemetryThread
else return Nothing
-- These cleanup actions are not directly associated with any
-- resource, but we still need to make sure we clean them up here.
allocate_ (pure ()) (liftIO stopWsServer)
pure app
where
isRetryRequired _ resp = do
return $ case resp of
Right _ -> False
Left err -> qeCode err == ConcurrentUpdate
prepareScheduledEvents (Logger logger) = do
liftIO $ logger $ mkGenericLog @Text LevelInfo "scheduled_triggers" "preparing data"
res <- Retry.retrying Retry.retryPolicyDefault isRetryRequired (return unlockAllLockedScheduledEvents)
onLeft res (\err -> logger $ mkGenericLog @String LevelError "scheduled_triggers" (show $ qeError err))
getProcessingScheduledEventsCount :: LockedEventsCtx -> IO Int
getProcessingScheduledEventsCount LockedEventsCtx {..} = do
processingCronEvents <- readTVarIO leCronEvents
processingOneOffEvents <- readTVarIO leOneOffEvents
return $ length processingOneOffEvents + length processingCronEvents
shutdownEventTriggerEvents ::
[BackendSourceInfo] ->
Logger Hasura ->
LockedEventsCtx ->
IO ()
shutdownEventTriggerEvents sources (Logger logger) LockedEventsCtx {..} = do
-- TODO: is this correct?
-- event triggers should be tied to the life cycle of a source
lockedEvents <- readTVarIO leEvents
forM_ sources $ \backendSourceInfo -> do
AB.dispatchAnyBackend @BackendEventTrigger backendSourceInfo \(SourceInfo sourceName _ _ _ sourceConfig _ _ :: SourceInfo b) -> do
let sourceNameText = sourceNameToText sourceName
logger $ mkGenericLog LevelInfo "event_triggers" $ "unlocking events of source: " <> sourceNameText
for_ (HM.lookup sourceName lockedEvents) $ \sourceLockedEvents -> do
-- No need to execute unlockEventsTx when events are not present
for_ (NE.nonEmptySet sourceLockedEvents) $ \nonEmptyLockedEvents -> do
res <- Retry.retrying Retry.retryPolicyDefault isRetryRequired (return $ unlockEventsInSource @b sourceConfig nonEmptyLockedEvents)
case res of
Left err ->
logger $
mkGenericLog LevelWarn "event_trigger" $
"Error while unlocking event trigger events of source: " <> sourceNameText <> " error:" <> showQErr err
Right count ->
logger $
mkGenericLog LevelInfo "event_trigger" $
tshow count <> " events of source " <> sourceNameText <> " were successfully unlocked"
shutdownAsyncActions ::
LockedEventsCtx ->
ExceptT QErr m ()
shutdownAsyncActions lockedEventsCtx = do
lockedActionEvents <- liftIO $ readTVarIO $ leActionEvents lockedEventsCtx
liftEitherM $ setProcessingActionLogsToPending (LockedActionIdArray $ toList lockedActionEvents)
-- This function is a helper function to do couple of things:
--
-- 1. When the value of the `graceful-shutdown-timeout` > 0, we poll
-- the in-flight events queue we maintain using the `processingEventsCountAction`
-- number of in-flight processing events, in case of actions it is the
-- actions which are in 'processing' state and in scheduled events
-- it is the events which are in 'locked' state. The in-flight events queue is polled
-- every 5 seconds until either the graceful shutdown time is exhausted
-- or the number of in-flight processing events is 0.
-- 2. After step 1, we unlock all the events which were attempted to process by the current
-- graphql-engine instance that are still in the processing
-- state. In actions, it means to set the status of such actions to 'pending'
-- and in scheduled events, the status will be set to 'unlocked'.
waitForProcessingAction ::
Logger Hasura ->
String ->
IO Int ->
ShutdownAction ->
Seconds ->
IO ()
waitForProcessingAction l@(Logger logger) actionType processingEventsCountAction' shutdownAction maxTimeout
| maxTimeout <= 0 = do
case shutdownAction of
EventTriggerShutdownAction userDBShutdownAction -> userDBShutdownAction
MetadataDBShutdownAction metadataDBShutdownAction ->
runExceptT metadataDBShutdownAction >>= \case
Left err ->
logger $
mkGenericLog LevelWarn (T.pack actionType) $
"Error while unlocking the processing "
<> tshow actionType
<> " err - "
<> showQErr err
Right () -> pure ()
| otherwise = do
processingEventsCount <- processingEventsCountAction'
if (processingEventsCount == 0)
then
logger $
mkGenericLog @Text LevelInfo (T.pack actionType) $
"All in-flight events have finished processing"
else unless (processingEventsCount == 0) $ do
C.sleep (5) -- sleep for 5 seconds and then repeat
waitForProcessingAction l actionType processingEventsCountAction' shutdownAction (maxTimeout - (Seconds 5))
startEventTriggerPollerThread logger lockedEventsCtx cacheRef = do
schemaCache <- liftIO $ getSchemaCache cacheRef
let maxEventThreads = unrefine soEventsHttpPoolSize
fetchInterval = milliseconds $ unrefine soEventsFetchInterval
allSources = HM.elems $ scSources schemaCache
unless (unrefine soEventsFetchBatchSize == 0 || fetchInterval == 0) $ do
-- Don't start the events poller thread when fetchBatchSize or fetchInterval is 0
-- prepare event triggers data
eventEngineCtx <- liftIO $ atomically $ initEventEngineCtx maxEventThreads fetchInterval soEventsFetchBatchSize
let eventsGracefulShutdownAction =
waitForProcessingAction
logger
"event_triggers"
(length <$> readTVarIO (leEvents lockedEventsCtx))
(EventTriggerShutdownAction (shutdownEventTriggerEvents allSources logger lockedEventsCtx))
(unrefine soGracefulShutdownTimeout)
-- Create logger for logging the statistics of events fetched
fetchedEventsStatsLogger <-
allocate
(createFetchedEventsStatsLogger logger)
(closeFetchedEventsStatsLogger logger)
unLogger logger $ mkGenericLog @Text LevelInfo "event_triggers" "starting workers"
void
$ C.forkManagedTWithGracefulShutdown
"processEventQueue"
logger
(C.ThreadShutdown (liftIO eventsGracefulShutdownAction))
$ processEventQueue
logger
fetchedEventsStatsLogger
scManager
(getSchemaCache cacheRef)
eventEngineCtx
lockedEventsCtx
scServerMetrics
(pmEventTriggerMetrics scPrometheusMetrics)
soEnableMaintenanceMode
startAsyncActionsPollerThread logger lockedEventsCtx cacheRef actionSubState = do
-- start a background thread to handle async actions
case soAsyncActionsFetchInterval of
Skip -> pure () -- Don't start the poller thread
Interval (unrefine -> sleepTime) -> do
let label = "asyncActionsProcessor"
asyncActionGracefulShutdownAction =
( liftWithStateless \lowerIO ->
( waitForProcessingAction
logger
"async_actions"
(length <$> readTVarIO (leActionEvents lockedEventsCtx))
(MetadataDBShutdownAction (hoist lowerIO (shutdownAsyncActions lockedEventsCtx)))
(unrefine soGracefulShutdownTimeout)
)
)
void
$ C.forkManagedTWithGracefulShutdown
label
logger
(C.ThreadShutdown asyncActionGracefulShutdownAction)
$ asyncActionsProcessor
env
logger
(getSchemaCache cacheRef)
(leActionEvents lockedEventsCtx)
scManager
scPrometheusMetrics
sleepTime
Nothing
-- start a background thread to handle async action live queries
void $
C.forkManagedT "asyncActionSubscriptionsProcessor" logger $
asyncActionSubscriptionsProcessor actionSubState
startScheduledEventsPollerThread logger lockedEventsCtx cacheRef = do
-- prepare scheduled triggers
lift $ prepareScheduledEvents logger
-- Create logger for logging the statistics of scheduled events fetched
scheduledEventsStatsLogger <-
allocate
(createFetchedScheduledEventsStatsLogger logger)
(closeFetchedScheduledEventsStatsLogger logger)
-- start a background thread to deliver the scheduled events
-- _scheduledEventsThread <- do
let scheduledEventsGracefulShutdownAction =
( liftWithStateless \lowerIO ->
( waitForProcessingAction
logger
"scheduled_events"
(getProcessingScheduledEventsCount lockedEventsCtx)
(MetadataDBShutdownAction (liftEitherM $ hoist lowerIO unlockAllLockedScheduledEvents))
(unrefine soGracefulShutdownTimeout)
)
)
void
$ C.forkManagedTWithGracefulShutdown
"processScheduledTriggers"
logger
(C.ThreadShutdown scheduledEventsGracefulShutdownAction)
$ processScheduledTriggers
env
logger
scheduledEventsStatsLogger
scManager
scPrometheusMetrics
(getSchemaCache cacheRef)
lockedEventsCtx
instance (Monad m) => Tracing.HasReporter (PGMetadataStorageAppT m)
instance (Monad m) => HasResourceLimits (PGMetadataStorageAppT m) where
askHTTPHandlerLimit = pure $ ResourceLimits id
askGraphqlOperationLimit _ _ _ = pure $ ResourceLimits id
instance (MonadIO m) => HttpLog (PGMetadataStorageAppT m) where
type ExtraHttpLogMetadata (PGMetadataStorageAppT m) = ()
emptyExtraHttpLogMetadata = ()
buildExtraHttpLogMetadata _ _ = ()
logHttpError logger loggingSettings userInfoM reqId waiReq req qErr headers _ =
unLogger logger $
mkHttpLog $
mkHttpErrorLogContext userInfoM loggingSettings reqId waiReq req qErr Nothing Nothing headers
logHttpSuccess logger loggingSettings userInfoM reqId waiReq reqBody response compressedResponse qTime cType headers (CommonHttpLogMetadata rb batchQueryOpLogs, ()) =
unLogger logger $
mkHttpLog $
mkHttpAccessLogContext userInfoM loggingSettings reqId waiReq reqBody (BL.length response) compressedResponse qTime cType headers rb batchQueryOpLogs
instance (Monad m) => MonadExecuteQuery (PGMetadataStorageAppT m) where
cacheLookup _ _ _ _ = pure ([], Nothing)
cacheStore _ _ _ = pure (Right CacheStoreSkipped)
instance (MonadIO m, MonadBaseControl IO m) => UserAuthentication (Tracing.TraceT (PGMetadataStorageAppT m)) where
resolveUserInfo logger manager headers authMode reqs =
runExceptT $ do
(a, b, c) <- getUserInfoWithExpTime logger manager headers authMode reqs
pure $ (a, b, c, ExtraUserInfo Nothing)
accessDeniedErrMsg :: Text
accessDeniedErrMsg =
"restricted access : admin only"
instance (Monad m) => MonadMetadataApiAuthorization (PGMetadataStorageAppT m) where
authorizeV1QueryApi query handlerCtx = runExceptT do
let currRole = _uiRole $ hcUser handlerCtx
when (requiresAdmin query && currRole /= adminRoleName) $
withPathK "args" $
throw400 AccessDenied accessDeniedErrMsg
authorizeV1MetadataApi _ handlerCtx = runExceptT do
let currRole = _uiRole $ hcUser handlerCtx
when (currRole /= adminRoleName) $
withPathK "args" $
throw400 AccessDenied accessDeniedErrMsg
authorizeV2QueryApi _ handlerCtx = runExceptT do
let currRole = _uiRole $ hcUser handlerCtx
when (currRole /= adminRoleName) $
withPathK "args" $
throw400 AccessDenied accessDeniedErrMsg
instance (Monad m) => ConsoleRenderer (PGMetadataStorageAppT m) where
renderConsole path authMode enableTelemetry consoleAssetsDir consoleSentryDsn =
return $ mkConsoleHTML path authMode enableTelemetry consoleAssetsDir consoleSentryDsn
instance (Monad m) => MonadVersionAPIWithExtraData (PGMetadataStorageAppT m) where
getExtraDataForVersionAPI = return []
instance (Monad m) => MonadGQLExecutionCheck (PGMetadataStorageAppT m) where
checkGQLExecution userInfo _ enableAL sc query _ = runExceptT $ do
req <- toParsed query
checkQueryInAllowlist enableAL AllowlistModeGlobalOnly userInfo req sc
return req
executeIntrospection _ introspectionQuery _ =
pure $ Right $ ExecStepRaw introspectionQuery
checkGQLBatchedReqs _ _ _ _ = runExceptT $ pure ()
instance (MonadIO m, MonadBaseControl IO m) => MonadConfigApiHandler (PGMetadataStorageAppT m) where
runConfigApiHandler = configApiGetHandler
instance (MonadIO m) => MonadQueryLog (PGMetadataStorageAppT m) where
logQueryLog logger = unLogger logger
instance (MonadIO m) => WS.MonadWSLog (PGMetadataStorageAppT m) where
logWSLog logger = unLogger logger
instance (Monad m) => MonadResolveSource (PGMetadataStorageAppT m) where
getPGSourceResolver = mkPgSourceResolver <$> asks snd
getMSSQLSourceResolver = return mkMSSQLSourceResolver
instance (Monad m) => EB.MonadQueryTags (PGMetadataStorageAppT m) where
createQueryTags _attributes _qtSourceConfig = return $ emptyQueryTagsComment
instance (Monad m) => MonadEventLogCleanup (PGMetadataStorageAppT m) where
runLogCleaner _ = pure $ throw400 NotSupported "Event log cleanup feature is enterprise edition only"
generateCleanupSchedules _ _ _ = pure $ Right ()
updateTriggerCleanupSchedules _ _ _ _ = pure $ Right ()
runInSeparateTx ::
(MonadIO m) =>
PG.TxE QErr a ->
PGMetadataStorageAppT m (Either QErr a)
runInSeparateTx tx = do
pool <- asks fst
liftIO $ runExceptT $ PG.runTx pool (PG.RepeatableRead, Nothing) tx
notifySchemaCacheSyncTx :: MetadataResourceVersion -> InstanceId -> CacheInvalidations -> PG.TxE QErr ()
notifySchemaCacheSyncTx (MetadataResourceVersion resourceVersion) instanceId invalidations = do
PG.Discard () <-
PG.withQE
defaultTxErrorHandler
[PG.sql|
INSERT INTO hdb_catalog.hdb_schema_notifications(id, notification, resource_version, instance_id)
VALUES (1, $1::json, $2, $3::uuid)
ON CONFLICT (id) DO UPDATE SET
notification = $1::json,
resource_version = $2,
instance_id = $3::uuid
|]
(PG.ViaJSON invalidations, resourceVersion, instanceId)
True
pure ()
getCatalogStateTx :: PG.TxE QErr CatalogState
getCatalogStateTx =
mkCatalogState . PG.getRow
<$> PG.withQE
defaultTxErrorHandler
[PG.sql|
SELECT hasura_uuid::text, cli_state::json, console_state::json
FROM hdb_catalog.hdb_version
|]
()
False
where
mkCatalogState (dbId, PG.ViaJSON cliState, PG.ViaJSON consoleState) =
CatalogState dbId cliState consoleState
setCatalogStateTx :: CatalogStateType -> A.Value -> PG.TxE QErr ()
setCatalogStateTx stateTy stateValue =
case stateTy of
CSTCli ->
PG.unitQE
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.hdb_version
SET cli_state = $1
|]
(Identity $ PG.ViaJSON stateValue)
False
CSTConsole ->
PG.unitQE
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.hdb_version
SET console_state = $1
|]
(Identity $ PG.ViaJSON stateValue)
False
-- | Each of the function in the type class is executed in a totally separate transaction.
instance (MonadIO m) => MonadMetadataStorage (PGMetadataStorageAppT m) where
fetchMetadataResourceVersion = runInSeparateTx fetchMetadataResourceVersionFromCatalog
fetchMetadata = runInSeparateTx fetchMetadataAndResourceVersionFromCatalog
fetchMetadataNotifications a b = runInSeparateTx $ fetchMetadataNotificationsFromCatalog a b
setMetadata r = runInSeparateTx . setMetadataInCatalog r
notifySchemaCacheSync a b c = runInSeparateTx $ notifySchemaCacheSyncTx a b c
getCatalogState = runInSeparateTx getCatalogStateTx
setCatalogState a b = runInSeparateTx $ setCatalogStateTx a b
getMetadataDbUid = runInSeparateTx getDbId
checkMetadataStorageHealth = runInSeparateTx $ checkDbConnection
getDeprivedCronTriggerStats = runInSeparateTx . getDeprivedCronTriggerStatsTx
getScheduledEventsForDelivery = runInSeparateTx getScheduledEventsForDeliveryTx
insertCronEvents = runInSeparateTx . insertCronEventsTx
insertOneOffScheduledEvent = runInSeparateTx . insertOneOffScheduledEventTx
insertScheduledEventInvocation a b = runInSeparateTx $ insertInvocationTx a b
setScheduledEventOp a b c = runInSeparateTx $ setScheduledEventOpTx a b c
unlockScheduledEvents a b = runInSeparateTx $ unlockScheduledEventsTx a b
unlockAllLockedScheduledEvents = runInSeparateTx unlockAllLockedScheduledEventsTx
clearFutureCronEvents = runInSeparateTx . dropFutureCronEventsTx
getOneOffScheduledEvents a b c = runInSeparateTx $ getOneOffScheduledEventsTx a b c
getCronEvents a b c d = runInSeparateTx $ getCronEventsTx a b c d
getScheduledEventInvocations a = runInSeparateTx $ getScheduledEventInvocationsTx a
deleteScheduledEvent a b = runInSeparateTx $ deleteScheduledEventTx a b
insertAction a b c d = runInSeparateTx $ insertActionTx a b c d
fetchUndeliveredActionEvents = runInSeparateTx fetchUndeliveredActionEventsTx
setActionStatus a b = runInSeparateTx $ setActionStatusTx a b
fetchActionResponse = runInSeparateTx . fetchActionResponseTx
clearActionData = runInSeparateTx . clearActionDataTx
setProcessingActionLogsToPending = runInSeparateTx . setProcessingActionLogsToPendingTx
instance MonadIO m => MonadMetadataStorageQueryAPI (PGMetadataStorageAppT m)
--- helper functions ---
mkConsoleHTML :: Text -> AuthMode -> TelemetryStatus -> Maybe Text -> Maybe Text -> Either String Text
mkConsoleHTML path authMode enableTelemetry consoleAssetsDir consoleSentryDsn =
renderHtmlTemplate consoleTmplt $
-- variables required to render the template
A.object
[ "isAdminSecretSet" A..= isAdminSecretSet authMode,
"consolePath" A..= consolePath,
"enableTelemetry" A..= boolToText (isTelemetryEnabled enableTelemetry),
"cdnAssets" A..= boolToText (isNothing consoleAssetsDir),
"consoleSentryDsn" A..= fromMaybe "" consoleSentryDsn,
"assetsVersion" A..= consoleAssetsVersion,
"serverVersion" A..= currentVersion,
"consoleSentryDsn" A..= ("" :: Text)
]
where
consolePath = case path of
"" -> "/console"
r -> "/console/" <> r
consoleTmplt = $(makeRelativeToProject "src-rsr/console.html" >>= M.embedSingleTemplate)
telemetryNotice :: Text
telemetryNotice =
"Help us improve Hasura! The graphql-engine server collects anonymized "
<> "usage stats which allows us to keep improving Hasura at warp speed. "
<> "To read more or opt-out, visit https://hasura.io/docs/latest/graphql/core/guides/telemetry.html"
mkPgSourceResolver :: PG.PGLogger -> SourceResolver ('Postgres 'Vanilla)
mkPgSourceResolver pgLogger _ config = runExceptT do
env <- lift Env.getEnvironment
let PostgresSourceConnInfo urlConf poolSettings allowPrepare isoLevel _ = _pccConnectionInfo config
-- If the user does not provide values for the pool settings, then use the default values
let (maxConns, idleTimeout, retries) = getDefaultPGPoolSettingIfNotExists poolSettings defaultPostgresPoolSettings
urlText <- resolveUrlConf env urlConf
let connInfo = PG.ConnInfo retries $ PG.CDDatabaseURI $ txtToBs urlText
connParams =
PG.defaultConnParams
{ PG.cpIdleTime = idleTimeout,
PG.cpConns = maxConns,
PG.cpAllowPrepare = allowPrepare,
PG.cpMbLifetime = _ppsConnectionLifetime =<< poolSettings,
PG.cpTimeout = _ppsPoolTimeout =<< poolSettings
}
pgPool <- liftIO $ Q.initPGPool connInfo connParams pgLogger
let pgExecCtx = mkPGExecCtx isoLevel pgPool NeverResizePool
pure $ PGSourceConfig pgExecCtx connInfo Nothing mempty (_pccExtensionsSchema config) mempty Nothing
mkMSSQLSourceResolver :: SourceResolver ('MSSQL)
mkMSSQLSourceResolver _name (MSSQLConnConfiguration connInfo _) = runExceptT do
env <- lift Env.getEnvironment
let MSSQLConnectionInfo iConnString MSSQLPoolSettings {..} = connInfo
connOptions =
MSPool.ConnectionOptions
{ _coConnections = fromMaybe defaultMSSQLMaxConnections _mpsMaxConnections,
_coStripes = 1,
_coIdleTime = _mpsIdleTimeout
}
(connString, mssqlPool) <- createMSSQLPool iConnString connOptions env
let mssqlExecCtx = mkMSSQLExecCtx mssqlPool NeverResizePool
pure $ MSSQLSourceConfig connString mssqlExecCtx