mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-15 01:12:56 +03:00
chore(server): various code cleanups
- Derive a few `instance`s - Delete some dead code (methods and types) - Delete some `INLINE` pragmas that are unlikely to have a big effect - Monomorphize Postgres `LISTEN` code to avoid effect juggling - Generalize some methods in `pg-client` so that others can be simplified - Handle errors differently for `TxET` to deduplicate code - Use `hoist` instead of specialized combinators such as `mapActionT` PR-URL: https://github.com/hasura/graphql-engine-mono/pull/8130 GitOrigin-RevId: bc1e908b6c0869f440a214a76744e92d40fea1e6
This commit is contained in:
parent
f14b47cf42
commit
88488362e0
@ -59,7 +59,6 @@ library
|
||||
, aeson >=1.0
|
||||
, aeson-casing >=0.1
|
||||
, async >=2
|
||||
, attoparsec >=0.13
|
||||
, base >=4.7
|
||||
, bytestring >=0.10
|
||||
, ekg-core >=0.1
|
||||
|
@ -8,11 +8,8 @@
|
||||
{-# HLINT ignore "Use onLeft" #-}
|
||||
|
||||
module Database.PG.Query.Class
|
||||
( WithCount (..),
|
||||
WithReturning (..),
|
||||
FromCol (..),
|
||||
( FromCol (..),
|
||||
fromColHelper,
|
||||
FromRow (..),
|
||||
FromRes (..),
|
||||
ToPrepArg (..),
|
||||
toPrepValHelper,
|
||||
@ -34,7 +31,6 @@ import Control.Monad.Identity (Identity (..))
|
||||
import Control.Monad.Trans.Except (ExceptT (..))
|
||||
import Data.Aeson (FromJSON, ToJSON, Value, encode, parseJSON)
|
||||
import Data.Aeson.Types (parseEither)
|
||||
import Data.Attoparsec.ByteString.Char8 qualified as Atto
|
||||
import Data.Bifunctor (first)
|
||||
import Data.ByteString (ByteString, uncons)
|
||||
import Data.ByteString.Lazy qualified as Lazy (ByteString)
|
||||
@ -59,17 +55,6 @@ import Prelude
|
||||
|
||||
-------------------------------------------------------------------------------
|
||||
|
||||
data WithCount a = WithCount
|
||||
{ wcCount :: Word64,
|
||||
wcValue :: a
|
||||
}
|
||||
deriving stock (Eq, Show)
|
||||
|
||||
data WithReturning a = WithReturning
|
||||
{ wrCount :: Word64,
|
||||
wrResults :: Maybe a
|
||||
}
|
||||
|
||||
newtype SingleRow a = SingleRow
|
||||
{ getRow :: a
|
||||
}
|
||||
@ -183,36 +168,6 @@ instance FromRes Discard where
|
||||
newtype Discard = Discard ()
|
||||
deriving stock (Eq, Show)
|
||||
|
||||
parseWord64 :: ByteString -> Either Text Word64
|
||||
parseWord64 b = either buildE return parsed
|
||||
where
|
||||
parsed = Atto.parseOnly (Atto.decimal <* Atto.endOfInput) b
|
||||
buildE e = Left $ "Couldn't parse Word64: " <> fromString e
|
||||
|
||||
extractCount :: PQ.Result -> ExceptT Text IO Word64
|
||||
extractCount r = do
|
||||
cmd <- liftIO $ PQ.cmdTuples r
|
||||
case cmd of
|
||||
Just "" -> throwError "Affected rows information not found"
|
||||
Nothing -> throwError "Affected rows information not found"
|
||||
Just bs -> ExceptT $ return $ parseWord64 bs
|
||||
|
||||
instance FromRes a => FromRes (WithReturning a) where
|
||||
fromRes (ResultOkEmpty res) = do
|
||||
c <- extractCount res
|
||||
return $ WithReturning c Nothing
|
||||
fromRes rs@(ResultOkData res) = do
|
||||
c <- extractCount res
|
||||
r <- fromRes rs
|
||||
return $ WithReturning c $ Just r
|
||||
|
||||
instance FromRes a => FromRes (WithCount a) where
|
||||
fromRes resOk = do
|
||||
let res = getPQRes resOk
|
||||
a <- fromRes resOk
|
||||
c <- extractCount res
|
||||
return $ WithCount c a
|
||||
|
||||
type ResultMatrix = V.Vector ResultRow
|
||||
|
||||
type ResultRow = V.Vector (Maybe ByteString)
|
||||
|
@ -35,7 +35,6 @@ module Database.PG.Query.Connection
|
||||
PrepArg,
|
||||
prepare,
|
||||
execMulti,
|
||||
execCmd,
|
||||
execQuery,
|
||||
lenientDecodeUtf8,
|
||||
PGErrInternal (..),
|
||||
@ -529,7 +528,6 @@ instance ToJSON PGErrInternal where
|
||||
toJSON (PGIUnexpected msg) = toJSON msg
|
||||
toJSON (PGIStatement errDetail) = toJSON errDetail
|
||||
|
||||
{-# INLINE execQuery #-}
|
||||
execQuery ::
|
||||
PGConn ->
|
||||
PGQuery a ->
|
||||
@ -555,7 +553,6 @@ execQuery pgConn pgQuery = do
|
||||
mRes <- run $ PQ.execPrepared conn rk vl PQ.Binary
|
||||
checkResult conn mRes
|
||||
|
||||
{-# INLINE execMulti #-}
|
||||
execMulti ::
|
||||
PGConn ->
|
||||
Template ->
|
||||
@ -570,17 +567,3 @@ execMulti pgConn (Template t) convF = do
|
||||
withExceptT PGIUnexpected $ convF resOk
|
||||
where
|
||||
PGConn conn _ cancelable _ _ _ _ _ _ = pgConn
|
||||
|
||||
{-# INLINE execCmd #-}
|
||||
execCmd ::
|
||||
PGConn ->
|
||||
Template ->
|
||||
ExceptT PGErrInternal IO ()
|
||||
execCmd pgConn (Template t) =
|
||||
retryOnConnErr pgConn $ do
|
||||
mRes <-
|
||||
bool lift (cancelOnAsync conn) cancelable $
|
||||
PQ.execParams conn t [] PQ.Binary
|
||||
assertResCmd conn mRes
|
||||
where
|
||||
PGConn conn _ cancelable _ _ _ _ _ _ = pgConn
|
||||
|
@ -3,6 +3,7 @@
|
||||
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
|
||||
{-# LANGUAGE MultiParamTypeClasses #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE TypeApplications #-}
|
||||
{-# OPTIONS_GHC -Wno-unrecognised-pragmas #-}
|
||||
|
||||
{-# HLINT ignore "Use onLeft" #-}
|
||||
@ -22,12 +23,7 @@ where
|
||||
|
||||
import Control.Concurrent (threadWaitRead)
|
||||
import Control.Exception.Safe (displayException, try)
|
||||
import Control.Monad (forever, unless)
|
||||
import Control.Monad.Except (MonadError (throwError))
|
||||
import Control.Monad.IO.Class (MonadIO (liftIO))
|
||||
import Control.Monad.Trans.Class (lift)
|
||||
import Control.Monad.Trans.Control (MonadBaseControl)
|
||||
import Control.Monad.Trans.Except (ExceptT, runExceptT)
|
||||
import Control.Monad.Except
|
||||
import Data.Foldable
|
||||
import Data.String (IsString)
|
||||
import Data.Text qualified as T
|
||||
@ -55,47 +51,41 @@ listen ::
|
||||
( FromPGConnErr e,
|
||||
FromPGTxErr e,
|
||||
MonadError e m,
|
||||
MonadIO m,
|
||||
MonadBaseControl IO m
|
||||
MonadIO m
|
||||
) =>
|
||||
PGPool ->
|
||||
PGChannel ->
|
||||
NotifyHandler ->
|
||||
m ()
|
||||
listen pool channel handler = catchConnErr $
|
||||
withExpiringPGconn pool $ \pgConn -> do
|
||||
let conn = pgPQConn pgConn
|
||||
listen pool channel handler = (>>= liftEither) $ liftIO $ runExceptT $ withConn pool $ \pgConn -> do
|
||||
let conn = pgPQConn pgConn
|
||||
|
||||
-- Issue listen command
|
||||
eRes <-
|
||||
liftIO $
|
||||
runExceptT $
|
||||
execMulti pgConn (mkTemplate listenCmd) $
|
||||
const $
|
||||
return ()
|
||||
either throwTxErr return eRes
|
||||
-- Emit onStart event
|
||||
liftIO $ handler PNEOnStart
|
||||
forever $ do
|
||||
-- Make postgres connection ready for reading
|
||||
r <- liftIO $ runExceptT $ waitForReadReadiness conn
|
||||
either (throwError . fromPGConnErr) return r
|
||||
-- Check for input
|
||||
success <- liftIO $ PQ.consumeInput conn
|
||||
unless success $ throwConsumeFailed conn
|
||||
liftIO $ processNotifs conn
|
||||
-- Issue listen command
|
||||
withExceptT (fromPGTxErr . handleTxErr) $
|
||||
execMulti pgConn (mkTemplate listenCmd) $
|
||||
const $
|
||||
return ()
|
||||
-- Emit onStart event
|
||||
lift $ handler PNEOnStart
|
||||
forever $ withExceptT fromPGConnErr $ do
|
||||
-- Make postgres connection ready for reading
|
||||
waitForReadReadiness conn
|
||||
-- Check for input
|
||||
success <- lift $ PQ.consumeInput conn
|
||||
unless success $ throwConsumeFailed conn
|
||||
lift $ processNotifs conn
|
||||
where
|
||||
listenCmd = "LISTEN " <> getChannelTxt channel <> ";"
|
||||
throwTxErr =
|
||||
throwError . fromPGTxErr . PGTxErr listenCmd [] False
|
||||
handleTxErr =
|
||||
PGTxErr listenCmd [] False
|
||||
throwConsumeFailed conn = do
|
||||
msg <- liftIO $ readConnErr conn
|
||||
throwError $ fromPGConnErr $ PGConnErr msg
|
||||
msg <- lift $ readConnErr conn
|
||||
throwError $ PGConnErr msg
|
||||
|
||||
processNotifs conn = do
|
||||
-- Collect notification
|
||||
-- Collect a notification
|
||||
mNotify <- PQ.notifies conn
|
||||
for_ mNotify $ \n -> do
|
||||
for_ @Maybe mNotify $ \n -> do
|
||||
-- Apply notify handler on arrived notification
|
||||
handler $ PNEPQNotify n
|
||||
-- Process remaining notifications if any
|
||||
@ -107,8 +97,7 @@ waitForReadReadiness conn = do
|
||||
mFd <- lift $ PQ.socket conn
|
||||
fd <- maybe (throwError $ PGConnErr "connection is not currently open") pure mFd
|
||||
-- Wait for the socket to be ready for reading
|
||||
waitResult <- lift . try $ threadWaitRead fd
|
||||
either (throwError . ioErrorToPGConnErr) return waitResult
|
||||
withExceptT ioErrorToPGConnErr $ ExceptT $ try $ threadWaitRead fd
|
||||
where
|
||||
ioErrorToPGConnErr :: IOError -> PGConnErr
|
||||
ioErrorToPGConnErr = PGConnErr . T.pack . displayException
|
||||
|
@ -13,18 +13,13 @@ module Database.PG.Query.Pool
|
||||
pgPoolStats,
|
||||
PGPoolStats (..),
|
||||
getInUseConnections,
|
||||
withExpiringPGconn,
|
||||
defaultConnParams,
|
||||
initPGPool,
|
||||
resizePGPool,
|
||||
destroyPGPool,
|
||||
withConn,
|
||||
beginTx,
|
||||
abortTx,
|
||||
commitTx,
|
||||
runTx,
|
||||
runTx',
|
||||
catchConnErr,
|
||||
sql,
|
||||
sqlFromFile,
|
||||
PGExecErr (..),
|
||||
@ -44,7 +39,7 @@ import Control.Monad (when)
|
||||
import Control.Monad.Except (MonadError (catchError, throwError))
|
||||
import Control.Monad.IO.Class (MonadIO (liftIO))
|
||||
import Control.Monad.Trans.Control (MonadBaseControl, control)
|
||||
import Control.Monad.Trans.Except (ExceptT, withExceptT)
|
||||
import Control.Monad.Trans.Except (ExceptT)
|
||||
import Data.Aeson (ToJSON (toJSON))
|
||||
import Data.ByteString qualified as BS
|
||||
import Data.HashTable.IO qualified as HIO
|
||||
@ -163,25 +158,31 @@ instance FromPGTxErr PGExecErr where
|
||||
instance FromPGConnErr PGExecErr where
|
||||
fromPGConnErr = PGExecErrConn
|
||||
|
||||
instance FromPGTxErr PGTxErr where
|
||||
fromPGTxErr = id
|
||||
|
||||
instance FromPGConnErr PGConnErr where
|
||||
fromPGConnErr = id
|
||||
|
||||
instance Show PGExecErr where
|
||||
show (PGExecErrConn pce) = show pce
|
||||
show (PGExecErrTx txe) = show txe
|
||||
|
||||
beginTx :: (MonadIO m) => TxMode -> TxT m ()
|
||||
beginTx :: (MonadIO m, FromPGTxErr e) => TxMode -> TxET e m ()
|
||||
beginTx (i, w) =
|
||||
unitQ query () True
|
||||
unitQE fromPGTxErr query () True
|
||||
where
|
||||
query =
|
||||
fromText . Text.pack $
|
||||
("BEGIN " <> show i <> " " <> maybe "" show w)
|
||||
|
||||
commitTx :: (MonadIO m) => TxT m ()
|
||||
commitTx :: (MonadIO m, FromPGTxErr e) => TxET e m ()
|
||||
commitTx =
|
||||
unitQ "COMMIT" () True
|
||||
unitQE fromPGTxErr "COMMIT" () True
|
||||
|
||||
abortTx :: (MonadIO m) => TxT m ()
|
||||
abortTx :: (MonadIO m, FromPGTxErr e) => TxET e m ()
|
||||
abortTx =
|
||||
unitQ "ABORT" () True
|
||||
unitQE fromPGTxErr "ABORT" () True
|
||||
|
||||
class FromPGTxErr e where
|
||||
fromPGTxErr :: PGTxErr -> e
|
||||
@ -197,15 +198,15 @@ asTransaction ::
|
||||
ExceptT e m a
|
||||
asTransaction txm f pgConn = do
|
||||
-- Begin the transaction. If there is an error, you shouldn't call abort
|
||||
withExceptT fromPGTxErr $ execTx pgConn $ beginTx txm
|
||||
execTx pgConn $ beginTx txm
|
||||
-- Run the actual transaction and commit. If there is an error, abort
|
||||
flip catchError abort $ do
|
||||
a <- f pgConn
|
||||
withExceptT fromPGTxErr $ execTx pgConn commitTx
|
||||
execTx pgConn commitTx
|
||||
return a
|
||||
where
|
||||
abort e = do
|
||||
withExceptT fromPGTxErr $ execTx pgConn abortTx
|
||||
execTx pgConn abortTx
|
||||
throwError e
|
||||
|
||||
-- | Run a command using the postgres pool.
|
||||
@ -215,11 +216,12 @@ asTransaction txm f pgConn = do
|
||||
withConn ::
|
||||
( MonadIO m,
|
||||
MonadBaseControl IO m,
|
||||
MonadError e m,
|
||||
FromPGConnErr e
|
||||
) =>
|
||||
PGPool ->
|
||||
(PGConn -> ExceptT e m a) ->
|
||||
ExceptT e m a
|
||||
(PGConn -> m a) ->
|
||||
m a
|
||||
withConn pool f =
|
||||
catchConnErr $ withExpiringPGconn pool f
|
||||
|
||||
@ -231,24 +233,17 @@ catchConnErr ::
|
||||
catchConnErr action =
|
||||
control $ \runInIO ->
|
||||
runInIO action
|
||||
`Exc.catches` [ Handler (runInIO . handler),
|
||||
`Exc.catches` [ Handler (runInIO . handlePGConnErr),
|
||||
Handler (runInIO . handleTimeout)
|
||||
]
|
||||
where
|
||||
handler = mkConnExHandler action fromPGConnErr
|
||||
handlePGConnErr :: PGConnErr -> m a
|
||||
handlePGConnErr = throwError . fromPGConnErr
|
||||
|
||||
handleTimeout :: RP.TimeoutException -> m a
|
||||
handleTimeout _ =
|
||||
handleTimeout RP.TimeoutException =
|
||||
throwError (fromPGConnErr $ PGConnErr "connection acquisition timeout expired")
|
||||
|
||||
{-# INLINE mkConnExHandler #-}
|
||||
mkConnExHandler ::
|
||||
(MonadError e m) =>
|
||||
m a ->
|
||||
(PGConnErr -> e) ->
|
||||
(PGConnErr -> m a)
|
||||
mkConnExHandler _ ef = throwError . ef
|
||||
|
||||
-- | Run a command on the given pool wrapped in a transaction.
|
||||
runTx ::
|
||||
( MonadIO m,
|
||||
|
@ -1,10 +1,11 @@
|
||||
{-# LANGUAGE DeriveLift #-}
|
||||
{-# LANGUAGE DerivingStrategies #-}
|
||||
{-# LANGUAGE DerivingVia #-}
|
||||
{-# LANGUAGE FlexibleContexts #-}
|
||||
{-# LANGUAGE FlexibleInstances #-}
|
||||
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
|
||||
{-# LANGUAGE MultiParamTypeClasses #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE StandaloneDeriving #-}
|
||||
{-# LANGUAGE TypeFamilies #-}
|
||||
{-# LANGUAGE UndecidableInstances #-}
|
||||
|
||||
@ -17,17 +18,13 @@ module Database.PG.Query.Transaction
|
||||
TxET (..),
|
||||
TxE,
|
||||
TxT,
|
||||
withNotices,
|
||||
withQ,
|
||||
withQE,
|
||||
rawQE,
|
||||
unitQ,
|
||||
unitQE,
|
||||
multiQE,
|
||||
discardQE,
|
||||
serverVersion,
|
||||
execTx,
|
||||
catchE,
|
||||
Query,
|
||||
fromText,
|
||||
fromBuilder,
|
||||
@ -37,16 +34,15 @@ where
|
||||
|
||||
-------------------------------------------------------------------------------
|
||||
|
||||
import Control.Monad.Base (MonadBase (liftBase))
|
||||
import Control.Monad.Base (MonadBase)
|
||||
import Control.Monad.Except (MonadError)
|
||||
import Control.Monad.Fix (MonadFix)
|
||||
import Control.Monad.IO.Class (MonadIO (liftIO))
|
||||
import Control.Monad.Morph (MFunctor, hoist)
|
||||
import Control.Monad.Morph (MFunctor (..), MonadTrans (..))
|
||||
import Control.Monad.Reader (MonadReader, asks)
|
||||
import Control.Monad.Trans.Class (MonadTrans (lift))
|
||||
import Control.Monad.Trans.Control (MonadBaseControl (StM, liftBaseWith, restoreM))
|
||||
import Control.Monad.Trans.Control (MonadBaseControl)
|
||||
import Control.Monad.Trans.Except (ExceptT, withExceptT)
|
||||
import Control.Monad.Trans.Reader (ReaderT (..), mapReaderT, runReaderT)
|
||||
import Control.Monad.Trans.Reader (ReaderT (..))
|
||||
import Data.Aeson (ToJSON (toJSON), object, (.=))
|
||||
import Data.Aeson.Text (encodeToLazyText)
|
||||
import Data.Hashable (Hashable)
|
||||
@ -68,7 +64,6 @@ data TxIsolation
|
||||
deriving stock (Eq, Lift)
|
||||
|
||||
instance Show TxIsolation where
|
||||
{-# INLINE show #-}
|
||||
show ReadCommitted = "ISOLATION LEVEL READ COMMITTED"
|
||||
show RepeatableRead = "ISOLATION LEVEL REPEATABLE READ"
|
||||
show Serializable = "ISOLATION LEVEL SERIALIZABLE"
|
||||
@ -79,7 +74,6 @@ data TxAccess
|
||||
deriving stock (Eq, Lift)
|
||||
|
||||
instance Show TxAccess where
|
||||
{-# INLINE show #-}
|
||||
show ReadWrite = "READ WRITE"
|
||||
show ReadOnly = "READ ONLY"
|
||||
|
||||
@ -104,22 +98,14 @@ instance MonadTrans (TxET e) where
|
||||
instance MFunctor (TxET e) where
|
||||
hoist f = TxET . hoist (hoist f) . txHandler
|
||||
|
||||
instance (MonadBase IO m) => MonadBase IO (TxET e m) where
|
||||
liftBase = lift . liftBase
|
||||
deriving via (ReaderT PGConn (ExceptT e m)) instance MonadBase IO m => MonadBase IO (TxET e m)
|
||||
|
||||
instance (MonadBaseControl IO m) => MonadBaseControl IO (TxET e m) where
|
||||
type StM (TxET e m) a = StM (ReaderT PGConn (ExceptT e m)) a
|
||||
liftBaseWith f = TxET $ liftBaseWith $ \run -> f (run . txHandler)
|
||||
restoreM = TxET . restoreM
|
||||
deriving via (ReaderT PGConn (ExceptT e m)) instance MonadBaseControl IO m => MonadBaseControl IO (TxET e m)
|
||||
|
||||
type TxE e a = TxET e IO a
|
||||
|
||||
type TxT m a = TxET PGTxErr m a
|
||||
|
||||
{-# INLINE catchE #-}
|
||||
catchE :: (Functor m) => (e -> e') -> TxET e m a -> TxET e' m a
|
||||
catchE f action = TxET $ mapReaderT (withExceptT f) $ txHandler action
|
||||
|
||||
data PGTxErr = PGTxErr
|
||||
{ pgteStatement :: !Text,
|
||||
pgteArguments :: ![PrepArg],
|
||||
@ -129,7 +115,6 @@ data PGTxErr = PGTxErr
|
||||
-- PGCustomErr !T.Text
|
||||
deriving stock (Eq)
|
||||
|
||||
{-# INLINE getPGStmtErr #-}
|
||||
getPGStmtErr :: PGTxErr -> Maybe PGStmtErrDetail
|
||||
getPGStmtErr (PGTxErr _ _ _ ei) = case ei of
|
||||
PGIStatement e -> return e
|
||||
@ -147,7 +132,6 @@ instance ToJSON PGTxErr where
|
||||
instance Show PGTxErr where
|
||||
show = show . encodeToLazyText
|
||||
|
||||
{-# INLINE execTx #-}
|
||||
execTx :: PGConn -> TxET e m a -> ExceptT e m a
|
||||
execTx conn tx = runReaderT (txHandler tx) conn
|
||||
|
||||
@ -157,22 +141,12 @@ newtype Query = Query
|
||||
deriving stock (Eq, Show)
|
||||
deriving newtype (Hashable, IsString, ToJSON)
|
||||
|
||||
{-# INLINE fromText #-}
|
||||
fromText :: Text -> Query
|
||||
fromText = Query
|
||||
|
||||
{-# INLINE fromBuilder #-}
|
||||
fromBuilder :: TB.Builder -> Query
|
||||
fromBuilder = Query . TB.run
|
||||
|
||||
withQ ::
|
||||
(MonadIO m, FromRes a, ToPrepArgs r) =>
|
||||
Query ->
|
||||
r ->
|
||||
Bool ->
|
||||
TxT m a
|
||||
withQ = withQE id
|
||||
|
||||
withQE ::
|
||||
(MonadIO m, FromRes a, ToPrepArgs r) =>
|
||||
(PGTxErr -> e) ->
|
||||
@ -215,33 +189,6 @@ multiQE ef q = TxET $
|
||||
txErrF = PGTxErr stmt [] False
|
||||
stmt = getQueryText q
|
||||
|
||||
withNotices :: (MonadIO m) => TxT m a -> TxT m (a, [Text])
|
||||
withNotices tx = do
|
||||
conn <- asks pgPQConn
|
||||
setToNotice
|
||||
liftIO $ PQ.enableNoticeReporting conn
|
||||
a <- tx
|
||||
notices <- liftIO $ getNotices conn []
|
||||
liftIO $ PQ.disableNoticeReporting conn
|
||||
setToWarn
|
||||
return (a, map lenientDecodeUtf8 notices)
|
||||
where
|
||||
setToNotice = unitQE id "SET client_min_messages TO NOTICE;" () False
|
||||
setToWarn = unitQE id "SET client_min_messages TO WARNING;" () False
|
||||
getNotices conn xs = do
|
||||
notice <- PQ.getNotice conn
|
||||
case notice of
|
||||
Nothing -> return xs
|
||||
Just bs -> getNotices conn (bs : xs)
|
||||
|
||||
unitQ ::
|
||||
(MonadIO m, ToPrepArgs r) =>
|
||||
Query ->
|
||||
r ->
|
||||
Bool ->
|
||||
TxT m ()
|
||||
unitQ = withQ
|
||||
|
||||
unitQE ::
|
||||
(MonadIO m, ToPrepArgs r) =>
|
||||
(PGTxErr -> e) ->
|
||||
|
@ -258,8 +258,7 @@ enablePgcryptoExtension (ExtensionsSchema extensionsSchema) = do
|
||||
dropHdbCatalogSchema :: (MonadTx m) => m ()
|
||||
dropHdbCatalogSchema =
|
||||
liftTx $
|
||||
PG.catchE defaultTxErrorHandler $
|
||||
-- This is where
|
||||
-- 1. Metadata storage:- Metadata and its stateful information stored
|
||||
-- 2. Postgres source:- Table event trigger related stuff & insert permission check function stored
|
||||
PG.unitQ "DROP SCHEMA IF EXISTS hdb_catalog CASCADE" () False
|
||||
-- This is where
|
||||
-- 1. Metadata storage:- Metadata and its stateful information stored
|
||||
-- 2. Postgres source:- Table event trigger related stuff & insert permission check function stored
|
||||
PG.unitQE defaultTxErrorHandler "DROP SCHEMA IF EXISTS hdb_catalog CASCADE" () False
|
||||
|
@ -22,6 +22,7 @@ where
|
||||
|
||||
import Control.Concurrent.Extended (sleep)
|
||||
import Control.Concurrent.STM qualified as STM
|
||||
import Control.Monad.Morph (hoist)
|
||||
import Control.Monad.Trans.Control qualified as MC
|
||||
import Data.Aeson qualified as J
|
||||
import Data.Aeson.Casing qualified as J
|
||||
@ -772,7 +773,7 @@ onStart env enabledLogTypes serverEnv wsConn shouldCaptureVariables (StartMsg op
|
||||
(telemTimeIO_DT, _respHdrs, resp) <-
|
||||
doQErr $
|
||||
E.execRemoteGQ env userInfo reqHdrs (rsDef rsi) gqlReq
|
||||
value <- mapExceptT lift $ extractFieldFromResponse fieldName resultCustomizer resp
|
||||
value <- hoist lift $ extractFieldFromResponse fieldName resultCustomizer resp
|
||||
finalResponse <-
|
||||
doQErr $
|
||||
RJ.processRemoteJoins
|
||||
|
@ -399,7 +399,7 @@ parseLegacyRemoteRelationshipDefinition =
|
||||
|
||||
fetchMetadataFromHdbTables :: MonadTx m => m MetadataNoSources
|
||||
fetchMetadataFromHdbTables = liftTx do
|
||||
tables <- PG.catchE defaultTxErrorHandler fetchTables
|
||||
tables <- fetchTables
|
||||
let tableMetaMap = OMap.fromList . flip map tables $
|
||||
\(schema, name, isEnum, maybeConfig) ->
|
||||
let qualifiedName = QualifiedObject schema name
|
||||
@ -407,13 +407,13 @@ fetchMetadataFromHdbTables = liftTx do
|
||||
in (qualifiedName, mkTableMeta qualifiedName isEnum configuration)
|
||||
|
||||
-- Fetch all the relationships
|
||||
relationships <- PG.catchE defaultTxErrorHandler fetchRelationships
|
||||
relationships <- fetchRelationships
|
||||
|
||||
objRelDefs <- mkRelDefs ObjRel relationships
|
||||
arrRelDefs <- mkRelDefs ArrRel relationships
|
||||
|
||||
-- Fetch all the permissions
|
||||
permissions <- PG.catchE defaultTxErrorHandler fetchPermissions
|
||||
permissions <- fetchPermissions
|
||||
|
||||
-- Parse all the permissions
|
||||
insPermDefs <- mkPermDefs PTInsert permissions
|
||||
@ -422,14 +422,14 @@ fetchMetadataFromHdbTables = liftTx do
|
||||
delPermDefs <- mkPermDefs PTDelete permissions
|
||||
|
||||
-- Fetch all event triggers
|
||||
eventTriggers :: [(SchemaName, Postgres.TableName, PG.ViaJSON Value)] <- PG.catchE defaultTxErrorHandler fetchEventTriggers
|
||||
eventTriggers :: [(SchemaName, Postgres.TableName, PG.ViaJSON Value)] <- fetchEventTriggers
|
||||
triggerMetaDefs <- mkTriggerMetaDefs eventTriggers
|
||||
|
||||
-- Fetch all computed fields
|
||||
computedFields <- fetchComputedFields
|
||||
|
||||
-- Fetch all remote relationships
|
||||
remoteRelationshipsRaw <- PG.catchE defaultTxErrorHandler fetchRemoteRelationships
|
||||
remoteRelationshipsRaw <- fetchRemoteRelationships
|
||||
remoteRelationships <- for remoteRelationshipsRaw $ \(table, relationshipName, definitionValue) -> do
|
||||
definition <- parseLegacyRemoteRelationshipDefinition definitionValue
|
||||
pure $ (table, RemoteRelationship relationshipName definition)
|
||||
@ -445,7 +445,7 @@ fetchMetadataFromHdbTables = liftTx do
|
||||
modMetaMap tmComputedFields _cfmName computedFields
|
||||
modMetaMap tmRemoteRelationships _rrName remoteRelationships
|
||||
|
||||
functions <- PG.catchE defaultTxErrorHandler fetchFunctions
|
||||
functions <- fetchFunctions
|
||||
remoteSchemas <- oMapFromL _rsmName <$> fetchRemoteSchemas
|
||||
collections <- oMapFromL _ccName <$> fetchCollections
|
||||
allowlist <- oMapFromL aeCollection <$> fetchAllowlist
|
||||
@ -487,7 +487,8 @@ fetchMetadataFromHdbTables = liftTx do
|
||||
return (QualifiedObject sn tn, conf)
|
||||
|
||||
fetchTables =
|
||||
PG.withQ
|
||||
PG.withQE
|
||||
defaultTxErrorHandler
|
||||
[PG.sql|
|
||||
SELECT table_schema, table_name, is_enum, configuration::json
|
||||
FROM hdb_catalog.hdb_table
|
||||
@ -498,7 +499,8 @@ fetchMetadataFromHdbTables = liftTx do
|
||||
False
|
||||
|
||||
fetchRelationships =
|
||||
PG.withQ
|
||||
PG.withQE
|
||||
defaultTxErrorHandler
|
||||
[PG.sql|
|
||||
SELECT table_schema, table_name, rel_name, rel_type, rel_def::json, comment
|
||||
FROM hdb_catalog.hdb_relationship
|
||||
@ -509,7 +511,8 @@ fetchMetadataFromHdbTables = liftTx do
|
||||
False
|
||||
|
||||
fetchPermissions =
|
||||
PG.withQ
|
||||
PG.withQE
|
||||
defaultTxErrorHandler
|
||||
[PG.sql|
|
||||
SELECT table_schema, table_name, role_name, perm_type, perm_def::json, comment
|
||||
FROM hdb_catalog.hdb_permission
|
||||
@ -520,7 +523,8 @@ fetchMetadataFromHdbTables = liftTx do
|
||||
False
|
||||
|
||||
fetchEventTriggers =
|
||||
PG.withQ
|
||||
PG.withQE
|
||||
defaultTxErrorHandler
|
||||
[PG.sql|
|
||||
SELECT e.schema_name, e.table_name, e.configuration::json
|
||||
FROM hdb_catalog.event_triggers e
|
||||
@ -531,7 +535,8 @@ fetchMetadataFromHdbTables = liftTx do
|
||||
|
||||
fetchFunctions = do
|
||||
l <-
|
||||
PG.withQ
|
||||
PG.withQE
|
||||
defaultTxErrorHandler
|
||||
[PG.sql|
|
||||
SELECT function_schema, function_name, configuration::json
|
||||
FROM hdb_catalog.hdb_function
|
||||
@ -690,7 +695,8 @@ fetchMetadataFromHdbTables = liftTx do
|
||||
|
||||
fetchRemoteRelationships = do
|
||||
r <-
|
||||
PG.withQ
|
||||
PG.withQE
|
||||
defaultTxErrorHandler
|
||||
[PG.sql|
|
||||
SELECT table_schema, table_name,
|
||||
remote_relationship_name, definition::json
|
||||
|
@ -24,12 +24,11 @@ data RunCtx = RunCtx
|
||||
_rcServerConfigCtx :: ServerConfigCtx
|
||||
}
|
||||
|
||||
newtype RunT m a = RunT {unRunT :: ReaderT RunCtx m a}
|
||||
newtype RunT m a = RunT {unRunT :: RunCtx -> m a}
|
||||
deriving
|
||||
( Functor,
|
||||
Applicative,
|
||||
Monad,
|
||||
MonadReader RunCtx,
|
||||
MonadError e,
|
||||
MonadIO,
|
||||
Tracing.MonadTrace,
|
||||
@ -37,32 +36,22 @@ newtype RunT m a = RunT {unRunT :: ReaderT RunCtx m a}
|
||||
MonadBaseControl b,
|
||||
MonadMetadataStorage,
|
||||
MonadMetadataStorageQueryAPI,
|
||||
ProvidesNetwork
|
||||
ProvidesNetwork,
|
||||
MonadResolveSource,
|
||||
MonadEventLogCleanup,
|
||||
MonadGetApiTimeLimit
|
||||
)
|
||||
|
||||
instance MonadTrans RunT where
|
||||
lift = RunT . lift
|
||||
via (ReaderT RunCtx m)
|
||||
deriving (MonadTrans) via (ReaderT RunCtx)
|
||||
|
||||
instance (Monad m) => UserInfoM (RunT m) where
|
||||
askUserInfo = asks _rcUserInfo
|
||||
askUserInfo = RunT $ pure . _rcUserInfo
|
||||
|
||||
instance (Monad m) => HasServerConfigCtx (RunT m) where
|
||||
askServerConfigCtx = asks _rcServerConfigCtx
|
||||
|
||||
instance (MonadResolveSource m) => MonadResolveSource (RunT m) where
|
||||
getPGSourceResolver = lift getPGSourceResolver
|
||||
getMSSQLSourceResolver = lift getMSSQLSourceResolver
|
||||
|
||||
instance (MonadEventLogCleanup m) => MonadEventLogCleanup (RunT m) where
|
||||
runLogCleaner conf = lift $ runLogCleaner conf
|
||||
generateCleanupSchedules sInfo tName cConf = lift $ generateCleanupSchedules sInfo tName cConf
|
||||
updateTriggerCleanupSchedules logger oldSources newSources schemaCache = lift $ updateTriggerCleanupSchedules logger oldSources newSources schemaCache
|
||||
|
||||
instance (MonadGetApiTimeLimit m) => MonadGetApiTimeLimit (RunT m) where
|
||||
runGetApiTimeLimit = RunT . lift $ runGetApiTimeLimit
|
||||
askServerConfigCtx = RunT $ pure . _rcServerConfigCtx
|
||||
|
||||
peelRun ::
|
||||
RunCtx ->
|
||||
RunT m a ->
|
||||
m a
|
||||
peelRun runCtx (RunT m) = runReaderT m runCtx
|
||||
peelRun = flip unRunT
|
||||
|
@ -25,9 +25,9 @@ where
|
||||
|
||||
import Control.Concurrent.Async.Lifted.Safe qualified as LA
|
||||
import Control.Exception (IOException, try)
|
||||
import Control.Monad.Morph (hoist)
|
||||
import Control.Monad.Stateless
|
||||
import Control.Monad.Trans.Control (MonadBaseControl)
|
||||
import Control.Monad.Trans.Control qualified as MTC
|
||||
import Data.Aeson hiding (json)
|
||||
import Data.Aeson qualified as J
|
||||
import Data.Aeson.Key qualified as K
|
||||
@ -260,13 +260,6 @@ class Monad m => MonadConfigApiHandler m where
|
||||
AppEnv ->
|
||||
Spock.SpockCtxT () m ()
|
||||
|
||||
mapActionT ::
|
||||
(Monad m, Monad n) =>
|
||||
(m (MTC.StT (Spock.ActionCtxT ()) a) -> n (MTC.StT (Spock.ActionCtxT ()) a)) ->
|
||||
Spock.ActionT m a ->
|
||||
Spock.ActionT n a
|
||||
mapActionT f tma = MTC.restoreT . pure =<< MTC.liftWith (\run -> f (run tma))
|
||||
|
||||
mkSpockAction ::
|
||||
forall m a.
|
||||
( MonadIO m,
|
||||
@ -335,7 +328,7 @@ mkSpockAction appCtx@AppContext {..} appEnv@AppEnv {..} qErrEncoder qErrModifier
|
||||
extraUserInfo
|
||||
)
|
||||
|
||||
mapActionT runTrace do
|
||||
hoist runTrace do
|
||||
-- Add the request ID to the tracing metadata so that we
|
||||
-- can correlate requests and traces
|
||||
lift $ Tracing.attachMetadata [("request_id", unRequestId requestId)]
|
||||
@ -378,8 +371,8 @@ mkSpockAction appCtx@AppContext {..} appEnv@AppEnv {..} qErrEncoder qErrModifier
|
||||
logSuccessAndResp (Just userInfo) requestId req (reqBody, queryJSON) res (Just (ioWaitTime, serviceTime)) origHeaders authHeaders httpLogMetadata
|
||||
where
|
||||
logErrorAndResp ::
|
||||
forall m3 a3 ctx.
|
||||
(MonadIO m3, HttpLog m3) =>
|
||||
forall n a3 ctx.
|
||||
(MonadIO n, HttpLog n) =>
|
||||
Maybe UserInfo ->
|
||||
RequestId ->
|
||||
Wai.Request ->
|
||||
@ -388,9 +381,9 @@ mkSpockAction appCtx@AppContext {..} appEnv@AppEnv {..} qErrEncoder qErrModifier
|
||||
[HTTP.Header] ->
|
||||
ExtraUserInfo ->
|
||||
QErr ->
|
||||
Spock.ActionCtxT ctx m3 a3
|
||||
Spock.ActionCtxT ctx n a3
|
||||
logErrorAndResp userInfo reqId waiReq req includeInternal headers extraUserInfo qErr = do
|
||||
let httpLogMetadata = buildHttpLogMetadata @m3 emptyHttpLogGraphQLInfo extraUserInfo
|
||||
let httpLogMetadata = buildHttpLogMetadata @n emptyHttpLogGraphQLInfo extraUserInfo
|
||||
jsonResponse = J.encode $ qErrEncoder includeInternal qErr
|
||||
contentLength = ("Content-Length", B8.toStrict $ BB.toLazyByteString $ BB.int64Dec $ BL.length jsonResponse)
|
||||
allHeaders = [contentLength, jsonHeader]
|
||||
|
@ -142,9 +142,8 @@ migrateCatalog maybeDefaultSourceConfig extensionsSchema maintenanceMode migrati
|
||||
initialize :: Bool -> m MigrationResult
|
||||
initialize createSchema = do
|
||||
liftTx $
|
||||
PG.catchE defaultTxErrorHandler $
|
||||
when createSchema $
|
||||
PG.unitQ "CREATE SCHEMA hdb_catalog" () False
|
||||
when createSchema $
|
||||
PG.unitQE defaultTxErrorHandler "CREATE SCHEMA hdb_catalog" () False
|
||||
enablePgcryptoExtension extensionsSchema
|
||||
multiQ $(makeRelativeToProject "src-rsr/initialise.sql" >>= PG.sqlFromFile)
|
||||
updateCatalogVersion
|
||||
|
@ -35,25 +35,27 @@ getCatalogVersion = do
|
||||
\err -> throw500 $ "Unexpected: couldn't convert read catalog version " <> versionText <> ", err:" <> tshow err
|
||||
|
||||
from3To4 :: forall m. (Backend ('Postgres 'Vanilla), MonadTx m) => m ()
|
||||
from3To4 = liftTx $
|
||||
PG.catchE defaultTxErrorHandler $ do
|
||||
PG.unitQ
|
||||
[PG.sql|
|
||||
from3To4 = liftTx do
|
||||
PG.unitQE
|
||||
defaultTxErrorHandler
|
||||
[PG.sql|
|
||||
ALTER TABLE hdb_catalog.event_triggers
|
||||
ADD COLUMN configuration JSON |]
|
||||
()
|
||||
False
|
||||
eventTriggers <-
|
||||
map uncurryEventTrigger
|
||||
<$> PG.withQ
|
||||
[PG.sql|
|
||||
()
|
||||
False
|
||||
eventTriggers <-
|
||||
map uncurryEventTrigger
|
||||
<$> PG.withQE
|
||||
defaultTxErrorHandler
|
||||
[PG.sql|
|
||||
SELECT e.name, e.definition::json, e.webhook, e.num_retries, e.retry_interval, e.headers::json
|
||||
FROM hdb_catalog.event_triggers e |]
|
||||
()
|
||||
False
|
||||
forM_ eventTriggers updateEventTrigger3To4
|
||||
PG.unitQ
|
||||
[PG.sql|
|
||||
()
|
||||
False
|
||||
forM_ eventTriggers updateEventTrigger3To4
|
||||
PG.unitQE
|
||||
defaultTxErrorHandler
|
||||
[PG.sql|
|
||||
ALTER TABLE hdb_catalog.event_triggers
|
||||
DROP COLUMN definition,
|
||||
DROP COLUMN query,
|
||||
@ -62,8 +64,8 @@ from3To4 = liftTx $
|
||||
DROP COLUMN retry_interval,
|
||||
DROP COLUMN headers,
|
||||
DROP COLUMN metadataTransform|]
|
||||
()
|
||||
False
|
||||
()
|
||||
False
|
||||
where
|
||||
uncurryEventTrigger ::
|
||||
( TriggerName,
|
||||
@ -77,7 +79,8 @@ from3To4 = liftTx $
|
||||
uncurryEventTrigger (trn, PG.ViaJSON tDef, w, nr, rint, PG.ViaJSON headers) =
|
||||
EventTriggerConf trn tDef (Just w) Nothing (RetryConf nr rint Nothing) headers Nothing Nothing Nothing TORDisableTrigger
|
||||
updateEventTrigger3To4 etc@(EventTriggerConf name _ _ _ _ _ _ _ _ _) =
|
||||
PG.unitQ
|
||||
PG.unitQE
|
||||
defaultTxErrorHandler
|
||||
[PG.sql|
|
||||
UPDATE hdb_catalog.event_triggers
|
||||
SET
|
||||
|
Loading…
Reference in New Issue
Block a user