generalize query execution logic on Postgres (#5110)

* generalize PGExecCtx to support specialized functions for various operations

* fix tests compilation

* allow customising PGExecCtx when starting the web server
This commit is contained in:
Vamshi Surabhi 2020-06-16 23:14:59 +05:30 committed by GitHub
parent 0cf4cbc5c6
commit 6fc404329a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 82 additions and 48 deletions

View File

@ -28,15 +28,15 @@ runApp (HGEOptionsG rci hgeCmd) =
HCServe serveOptions -> do
(initCtx, initTime) <- initialiseCtx hgeCmd rci
-- Catches the SIGTERM signal and initiates a graceful shutdown.
-- Graceful shutdown for regular HTTP requests is already implemented in
-- Graceful shutdown for regular HTTP requests is already implemented in
-- Warp, and is triggered by invoking the 'closeSocket' callback.
-- We only catch the SIGTERM signal once, that is, if the user hits CTRL-C
-- We only catch the SIGTERM signal once, that is, if the user hits CTRL-C
-- once again, we terminate the process immediately.
_ <- liftIO $ Signals.installHandler
Signals.sigTERM
(Signals.CatchOnce (shutdownGracefully initCtx))
Nothing
runHGEServer serveOptions initCtx initTime
runHGEServer serveOptions initCtx Nothing initTime
HCExport -> do
(initCtx, _) <- initialiseCtx hgeCmd rci
res <- runTx' initCtx fetchMetadata Q.ReadCommitted

View File

@ -217,13 +217,17 @@ runHGEServer
)
=> ServeOptions impl
-> InitCtx
-> Maybe PGExecCtx
-- ^ An optional specialized pg exection context for executing queries
-- and mutations
-> UTCTime
-- ^ start time
-> m ()
runHGEServer ServeOptions{..} InitCtx{..} initTime = do
-- Comment this to enable expensive assertions from "GHC.AssertNF". These will log lines to
-- STDOUT containing "not in normal form". In the future we could try to integrate this into
-- our tests. For now this is a development tool.
runHGEServer ServeOptions{..} InitCtx{..} pgExecCtx initTime = do
-- Comment this to enable expensive assertions from "GHC.AssertNF". These
-- will log lines to STDOUT containing "not in normal form". In the future we
-- could try to integrate this into our tests. For now this is a development
-- tool.
--
-- NOTE: be sure to compile WITHOUT code coverage, for this to work properly.
liftIO disableAssertNF
@ -236,8 +240,9 @@ runHGEServer ServeOptions{..} InitCtx{..} initTime = do
authMode <- either (printErrExit . T.unpack) return authModeRes
-- If an exception is encountered in 'mkWaiApp', flush the log buffer and rethrow
-- If we do not flush the log buffer on exception, then log lines written in 'mkWaiApp' may be missed
-- If an exception is encountered in 'mkWaiApp', flush the log buffer and
-- rethrow If we do not flush the log buffer on exception, then log lines
-- written in 'mkWaiApp' may be missed
-- See: https://github.com/hasura/graphql-engine/issues/4772
let flushLogger = liftIO $ FL.flushLogStr $ _lcLoggerSet loggerCtx
HasuraApp app cacheRef cacheInitTime shutdownApp <- flip onException flushLogger $
@ -246,6 +251,7 @@ runHGEServer ServeOptions{..} InitCtx{..} initTime = do
sqlGenCtx
soEnableAllowlist
_icPgPool
pgExecCtx
_icConnInfo
_icHttpManager
authMode
@ -387,7 +393,7 @@ runAsAdmin
-> m (Either QErr a)
runAsAdmin pool sqlGenCtx httpManager m = do
let runCtx = RunCtx adminUserInfo httpManager sqlGenCtx
pgCtx = PGExecCtx pool Q.Serializable
pgCtx = mkPGExecCtx Q.Serializable pool
runExceptT $ peelRun runCtx pgCtx Q.ReadWrite m
execQuery

View File

@ -8,8 +8,9 @@ module Hasura.Db
, LazyTx
, PGExecCtx(..)
, mkPGExecCtx
, runLazyTx
, runLazyTx'
, runQueryTx
, withUserInfo
, sessionInfoJsonExp
@ -23,6 +24,7 @@ import Control.Lens
import Control.Monad.Trans.Control (MonadBaseControl (..))
import Control.Monad.Unique
import Control.Monad.Validate
import Data.Either (isRight)
import qualified Data.Aeson.Extended as J
import qualified Database.PG.Query as Q
@ -39,10 +41,35 @@ import qualified Hasura.SQL.DML as S
data PGExecCtx
= PGExecCtx
{ _pecPool :: !Q.PGPool
, _pecTxIsolation :: !Q.TxIsolation
{ _pecRunReadOnly :: (forall a. Q.TxE QErr a -> ExceptT QErr IO a)
-- ^ Run a Q.ReadOnly transaction
, _pecRunReadNoTx :: (forall a. Q.TxE QErr a -> ExceptT QErr IO a)
-- ^ Run a read only statement without an explicit transaction block
, _pecRunReadWrite :: (forall a. Q.TxE QErr a -> ExceptT QErr IO a)
-- ^ Run a Q.ReadWrite transaction
, _pecCheckHealth :: (IO Bool)
-- ^ Checks the health of this execution context
}
-- | Creates a Postgres execution context for a single Postgres master pool
mkPGExecCtx :: Q.TxIsolation -> Q.PGPool -> PGExecCtx
mkPGExecCtx isoLevel pool =
PGExecCtx
{ _pecRunReadOnly = (Q.runTx pool (isoLevel, Just Q.ReadOnly))
, _pecRunReadNoTx = (Q.runTx' pool)
, _pecRunReadWrite = (Q.runTx pool (isoLevel, Just Q.ReadWrite))
, _pecCheckHealth = checkDbConnection
}
where
checkDbConnection = do
e <- liftIO $ runExceptT $ Q.runTx' pool select1Query
pure $ isRight e
where
select1Query :: Q.TxE QErr Int
select1Query =
runIdentity . Q.getRow <$>
Q.withQE defaultTxErrorHandler [Q.sql| SELECT 1 |] () False
class (MonadError QErr m) => MonadTx m where
liftTx :: Q.TxE QErr a -> m a
@ -55,14 +82,16 @@ instance (Monoid w, MonadTx m) => MonadTx (WriterT w m) where
instance (MonadTx m) => MonadTx (ValidateT e m) where
liftTx = lift . liftTx
-- | Like 'Q.TxE', but defers acquiring a Postgres connection until the first execution of 'liftTx'.
-- If no call to 'liftTx' is ever reached (i.e. a successful result is returned or an error is
-- raised before ever executing a query), no connection is ever acquired.
-- | Like 'Q.TxE', but defers acquiring a Postgres connection until the first
-- execution of 'liftTx'. If no call to 'liftTx' is ever reached (i.e. a
-- successful result is returned or an error is raised before ever executing a
-- query), no connection is ever acquired.
--
-- This is useful for certain code paths that only conditionally need database access. For example,
-- although most queries will eventually hit Postgres, introspection queries or queries that
-- exclusively use remote schemas never will; using 'LazyTx' keeps those branches from unnecessarily
-- allocating a connection.
-- This is useful for certain code paths that only conditionally need database
-- access. For example, although most queries will eventually hit Postgres,
-- introspection queries or queries that exclusively use remote schemas never
-- will; using 'LazyTx' keeps those branches from unnecessarily allocating a
-- connection.
data LazyTx e a
= LTErr !e
| LTNoTx !a
@ -84,17 +113,22 @@ runLazyTx
=> PGExecCtx
-> Q.TxAccess
-> LazyTx QErr a -> ExceptT QErr m a
runLazyTx (PGExecCtx pgPool txIso) txAccess = \case
runLazyTx pgExecCtx txAccess = \case
LTErr e -> throwError e
LTNoTx a -> return a
LTTx tx -> ExceptT <$> liftIO $ runExceptT $ Q.runTx pgPool (txIso, Just txAccess) tx
LTTx tx ->
case txAccess of
Q.ReadOnly -> ExceptT <$> liftIO $ runExceptT $ _pecRunReadOnly pgExecCtx tx
Q.ReadWrite -> ExceptT <$> liftIO $ runExceptT $ _pecRunReadWrite pgExecCtx tx
runLazyTx'
-- | This runs the given set of statements (Tx) without wrapping them in BEGIN
-- and COMMIT. This should only be used for running a single statement query!
runQueryTx
:: MonadIO m => PGExecCtx -> LazyTx QErr a -> ExceptT QErr m a
runLazyTx' (PGExecCtx pgPool _) = \case
runQueryTx pgExecCtx = \case
LTErr e -> throwError e
LTNoTx a -> return a
LTTx tx -> ExceptT <$> liftIO $ runExceptT $ Q.runTx' pgPool tx
LTTx tx -> ExceptT <$> liftIO $ runExceptT $ _pecRunReadNoTx pgExecCtx tx
type RespTx = Q.TxE QErr EncJSON
type LazyRespTx = LazyTx QErr EncJSON

View File

@ -224,15 +224,15 @@ validateVariables
-> m (ValidatedVariables f)
validateVariables pgExecCtx variableValues = do
let valSel = mkValidationSel $ toList variableValues
Q.Discard () <- runTx' $ liftTx $
Q.Discard () <- runQueryTx_ $ liftTx $
Q.rawQE dataExnErrHandler (Q.fromBuilder $ toSQL valSel) [] False
pure . ValidatedVariables $ fmap (txtEncodedPGVal . pstValue) variableValues
where
mkExtrs = map (flip S.Extractor Nothing . toTxtValue)
mkValidationSel vars =
S.mkSelect { S.selExtr = mkExtrs vars }
runTx' tx = do
res <- liftIO $ runExceptT (runLazyTx' pgExecCtx tx)
runQueryTx_ tx = do
res <- liftIO $ runExceptT (runQueryTx pgExecCtx tx)
liftEither res
-- Explicitly look for the class of errors raised when the format of a value provided

View File

@ -408,7 +408,7 @@ pollQuery logger pollerId lqOpts pgExecCtx pgQuery cohortMap = do
-- concurrently process each batch
batchesDetails <- A.forConcurrently cohortBatches $ \cohorts -> do
(queryExecutionTime, mxRes) <- withElapsedTime $
runExceptT $ runLazyTx' pgExecCtx $
runExceptT $ runQueryTx pgExecCtx $
executeMultiplexedQuery pgQuery $ over (each._2) _csVariables cohorts
let lqMeta = LiveQueryMetadata $ convertDuration queryExecutionTime

View File

@ -129,7 +129,7 @@ runHasuraGQ reqId query userInfo resolvedOp = do
E.ExOpQuery tx genSql -> do
-- log the generated SQL and the graphql query
L.unLogger logger $ QueryLog query genSql reqId
([],) <$> runLazyTx' pgExecCtx tx
([],) <$> runQueryTx pgExecCtx tx
E.ExOpMutation respHeaders tx -> do
-- log the graphql query

View File

@ -342,7 +342,7 @@ onStart serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
-> ExceptT () m ()
runHasuraGQ timerTot telemCacheHit reqId query userInfo = \case
E.ExOpQuery opTx genSql ->
execQueryOrMut Telem.Query genSql $ runLazyTx' pgExecCtx opTx
execQueryOrMut Telem.Query genSql $ runQueryTx pgExecCtx opTx
-- Response headers discarded over websockets
E.ExOpMutation _ opTx ->
execQueryOrMut Telem.Mutation Nothing $

View File

@ -9,7 +9,6 @@ import Control.Lens (view, _2)
import Control.Monad.Stateless
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Aeson hiding (json)
import Data.Either (isRight)
import Data.Int (Int64)
import Data.IORef
import Data.Time.Clock (UTCTime, getCurrentTime)
@ -35,6 +34,7 @@ import qualified System.Metrics.Json as EKG
import qualified Text.Mustache as M
import qualified Web.Spock.Core as Spock
import Hasura.Db
import Hasura.EncJSON
import Hasura.GraphQL.Resolve.Action
import Hasura.HTTP
@ -495,6 +495,7 @@ mkWaiApp
-> SQLGenCtx
-> Bool
-> Q.PGPool
-> Maybe PGExecCtx
-> Q.ConnInfo
-> HTTP.Manager
-> AuthMode
@ -508,14 +509,15 @@ mkWaiApp
-> E.PlanCacheOptions
-> ResponseInternalErrorsConfig
-> m HasuraApp
mkWaiApp isoLevel logger sqlGenCtx enableAL pool ci httpManager mode corsCfg enableConsole consoleAssetsDir
mkWaiApp isoLevel logger sqlGenCtx enableAL pool pgExecCtxCustom ci httpManager mode corsCfg enableConsole consoleAssetsDir
enableTelemetry instanceId apis lqOpts planCacheOptions responseErrorsConfig = do
(planCache, schemaCacheRef, cacheBuiltTime) <- migrateAndInitialiseSchemaCache
let getSchemaCache = first lastBuiltSchemaCache <$> readIORef (_scrCache schemaCacheRef)
let corsPolicy = mkDefaultCorsPolicy corsCfg
pgExecCtx = PGExecCtx pool isoLevel
pgExecCtx = fromMaybe (mkPGExecCtx isoLevel pool) pgExecCtxCustom
lqState <- liftIO $ EL.initLiveQueriesState lqOpts pgExecCtx
wsServerEnv <- WS.createWSServerEnv logger pgExecCtx lqState getSchemaCache httpManager
corsPolicy sqlGenCtx enableAL planCache
@ -560,7 +562,7 @@ mkWaiApp isoLevel logger sqlGenCtx enableAL pool ci httpManager mode corsCfg ena
migrateAndInitialiseSchemaCache :: m (E.PlanCache, SchemaCacheRef, Maybe UTCTime)
migrateAndInitialiseSchemaCache = do
let pgExecCtx = PGExecCtx pool Q.Serializable
let pgExecCtx = mkPGExecCtx Q.Serializable pool
adminRunCtx = RunCtx adminUserInfo httpManager sqlGenCtx
currentTime <- liftIO getCurrentTime
initialiseResult <- runExceptT $ peelRun adminRunCtx pgExecCtx Q.ReadWrite do
@ -612,7 +614,7 @@ httpApp corsCfg serverCtx enableConsole consoleAssetsDir enableTelemetry = do
-- Health check endpoint
Spock.get "healthz" $ do
sc <- getSCFromRef $ scCacheRef serverCtx
dbOk <- checkDbConnection
dbOk <- liftIO $ _pecCheckHealth $ scPGExecCtx serverCtx
if dbOk
then Spock.setStatus HTTP.status200 >> (Spock.text $ if null (scInconsistentObjs sc)
then "OK"
@ -703,14 +705,6 @@ httpApp corsCfg serverCtx enableConsole consoleAssetsDir enableTelemetry = do
enablePGDump = isPGDumpEnabled serverCtx
enableConfig = isConfigEnabled serverCtx
checkDbConnection = do
e <- liftIO $ runExceptT $ runLazyTx' (scPGExecCtx serverCtx) select1Query
pure $ isRight e
where
select1Query :: (MonadTx m) => m Int
select1Query = liftTx $ runIdentity . Q.getRow <$> Q.withQE defaultTxErrorHandler
[Q.sql| SELECT 1 |] () False
serveApiConsole = do
-- redirect / to /console
Spock.get Spock.root $ Spock.redirect "console"

View File

@ -2,9 +2,9 @@ module Hasura.Server.SchemaUpdate
(startSchemaSyncThreads)
where
import Hasura.Db
import Hasura.Prelude
import Hasura.Session
import Hasura.Logging
import Hasura.RQL.DDL.Schema (runCacheRWT)
import Hasura.RQL.Types
@ -228,7 +228,7 @@ refreshSchemaCache sqlGenCtx pool logger httpManager cacheRef invalidations thre
Right () -> logInfo logger threadType $ object ["message" .= msg]
where
runCtx = RunCtx adminUserInfo httpManager sqlGenCtx
pgCtx = PGExecCtx pool PG.Serializable
pgCtx = mkPGExecCtx PG.Serializable pool
logInfo :: Logger Hasura -> ThreadType -> Value -> IO ()
logInfo logger threadType val = unLogger logger $

View File

@ -17,7 +17,7 @@ import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Client.TLS as HTTP
import qualified Test.Hspec.Runner as Hspec
import Hasura.Db (PGExecCtx (..))
import Hasura.Db (mkPGExecCtx)
import Hasura.RQL.Types (SQLGenCtx (..))
import Hasura.RQL.Types.Run
import Hasura.Server.Init (RawConnInfo, mkConnInfo, mkRawConnInfo,
@ -79,7 +79,7 @@ buildPostgresSpecs pgConnOptions = do
httpManager <- HTTP.newManager HTTP.tlsManagerSettings
let runContext = RunCtx adminUserInfo httpManager (SQLGenCtx False)
pgContext = PGExecCtx pgPool Q.Serializable
pgContext = mkPGExecCtx Q.Serializable pgPool
runAsAdmin :: Run a -> IO a
runAsAdmin =