mirror of
https://github.com/nikita-volkov/hasql.git
synced 2024-11-22 01:52:45 +03:00
Implement pipeline errors
This commit is contained in:
parent
5e8147b737
commit
8f5d6f7e81
@ -138,7 +138,7 @@ library
|
||||
text >=1 && <3,
|
||||
text-builder >=0.6.7 && <0.7,
|
||||
time >=1.9 && <2,
|
||||
transformers >=0.3 && <0.7,
|
||||
transformers >=0.6 && <0.7,
|
||||
uuid >=1.3 && <2,
|
||||
vector >=0.10 && <0.14,
|
||||
|
||||
|
@ -17,11 +17,11 @@ import Hasql.Prelude hiding (many, maybe)
|
||||
import Hasql.Prelude qualified as Prelude
|
||||
|
||||
newtype Results a
|
||||
= Results (ReaderT (Bool, LibPQ.Connection) (ExceptT QueryError IO) a)
|
||||
= Results (ReaderT (Bool, LibPQ.Connection) (ExceptT CommandError IO) a)
|
||||
deriving (Functor, Applicative, Monad)
|
||||
|
||||
{-# INLINE run #-}
|
||||
run :: Results a -> LibPQ.Connection -> Bool -> IO (Either QueryError a)
|
||||
run :: Results a -> LibPQ.Connection -> Bool -> IO (Either CommandError a)
|
||||
run (Results stack) conn idt =
|
||||
runExceptT (runReaderT stack (idt, conn))
|
||||
|
||||
@ -32,7 +32,7 @@ clientError =
|
||||
$ ReaderT
|
||||
$ \(_, connection) ->
|
||||
ExceptT
|
||||
$ fmap (Left . ClientQueryError) (LibPQ.errorMessage connection)
|
||||
$ fmap (Left . ClientCommandError) (LibPQ.errorMessage connection)
|
||||
|
||||
-- |
|
||||
-- Parse a single result.
|
||||
@ -45,9 +45,9 @@ single resultDec =
|
||||
resultMaybe <- LibPQ.getResult connection
|
||||
case resultMaybe of
|
||||
Just result ->
|
||||
mapLeft ResultQueryError <$> Result.run resultDec integerDatetimes result
|
||||
mapLeft ResultCommandError <$> Result.run resultDec integerDatetimes result
|
||||
Nothing ->
|
||||
fmap (Left . ClientQueryError) (LibPQ.errorMessage connection)
|
||||
fmap (Left . ClientCommandError) (LibPQ.errorMessage connection)
|
||||
|
||||
-- |
|
||||
-- Fetch a single result.
|
||||
@ -60,7 +60,7 @@ getResult =
|
||||
resultMaybe <- LibPQ.getResult connection
|
||||
case resultMaybe of
|
||||
Just result -> pure (Right result)
|
||||
Nothing -> fmap (Left . ClientQueryError) (LibPQ.errorMessage connection)
|
||||
Nothing -> fmap (Left . ClientCommandError) (LibPQ.errorMessage connection)
|
||||
|
||||
-- |
|
||||
-- Fetch a single result.
|
||||
@ -84,11 +84,11 @@ dropRemainders =
|
||||
loop integerDatetimes connection <* checkErrors
|
||||
where
|
||||
checkErrors =
|
||||
ExceptT $ fmap (mapLeft ResultQueryError) $ Result.run Result.noResult integerDatetimes result
|
||||
ExceptT $ fmap (mapLeft ResultCommandError) $ Result.run Result.noResult integerDatetimes result
|
||||
|
||||
refine :: (a -> Either Text b) -> Results a -> Results b
|
||||
refine refiner (Results stack) = Results
|
||||
$ ReaderT
|
||||
$ \env -> ExceptT $ do
|
||||
resultEither <- runExceptT $ runReaderT stack env
|
||||
return $ resultEither >>= mapLeft (ResultQueryError . UnexpectedResultError) . refiner
|
||||
return $ resultEither >>= mapLeft (ResultCommandError . UnexpectedResultError) . refiner
|
||||
|
@ -13,8 +13,12 @@ data SessionError
|
||||
ByteString
|
||||
-- | Parameters rendered as human-readable SQL literals.
|
||||
[Text]
|
||||
-- | Error details
|
||||
QueryError
|
||||
-- | Error details.
|
||||
CommandError
|
||||
| -- | Error during the execution of a pipeline.
|
||||
PipelineSessionError
|
||||
-- | Error details.
|
||||
CommandError
|
||||
deriving (Show, Eq, Typeable)
|
||||
|
||||
instance Exception SessionError where
|
||||
@ -22,8 +26,8 @@ instance Exception SessionError where
|
||||
QuerySessionError query params commandError ->
|
||||
let queryContext :: Maybe (ByteString, Int)
|
||||
queryContext = case commandError of
|
||||
ClientQueryError _ -> Nothing
|
||||
ResultQueryError resultError -> case resultError of
|
||||
ClientCommandError _ -> Nothing
|
||||
ResultCommandError resultError -> case resultError of
|
||||
ServerResultError _ message _ _ (Just position) -> Just (message, position)
|
||||
_ -> Nothing
|
||||
|
||||
@ -63,35 +67,39 @@ instance Exception SessionError where
|
||||
<> "\n"
|
||||
<> "\n Params: "
|
||||
<> show params
|
||||
<> "\n Error: "
|
||||
<> case commandError of
|
||||
ClientQueryError (Just message) -> "Client error: " <> show message
|
||||
ClientQueryError Nothing -> "Client error without details"
|
||||
ResultQueryError resultError -> case resultError of
|
||||
ServerResultError code message details hint position ->
|
||||
"Server error "
|
||||
<> BC.unpack code
|
||||
<> ": "
|
||||
<> BC.unpack message
|
||||
<> maybe "" (\d -> "\n Details: " <> BC.unpack d) details
|
||||
<> maybe "" (\h -> "\n Hint: " <> BC.unpack h) hint
|
||||
UnexpectedResultError message -> "Unexpected result: " <> show message
|
||||
RowResultError row (ColumnRowError column rowError) ->
|
||||
"Row error: " <> show row <> ":" <> show column <> " " <> show rowError
|
||||
UnexpectedAmountOfRowsResultError amount ->
|
||||
"Unexpected amount of rows: " <> show amount
|
||||
<> "\n Reason: "
|
||||
<> renderCommandErrorAsReason commandError
|
||||
PipelineSessionError commandError ->
|
||||
"PipelineSessionError!\n Reason: " <> renderCommandErrorAsReason commandError
|
||||
where
|
||||
renderCommandErrorAsReason = \case
|
||||
ClientCommandError (Just message) -> "Client error: " <> show message
|
||||
ClientCommandError Nothing -> "Client error without details"
|
||||
ResultCommandError resultError -> case resultError of
|
||||
ServerResultError code message details hint position ->
|
||||
"Server error "
|
||||
<> BC.unpack code
|
||||
<> ": "
|
||||
<> BC.unpack message
|
||||
<> maybe "" (\d -> "\n Details: " <> BC.unpack d) details
|
||||
<> maybe "" (\h -> "\n Hint: " <> BC.unpack h) hint
|
||||
UnexpectedResultError message -> "Unexpected result: " <> show message
|
||||
RowResultError row (ColumnRowError column rowError) ->
|
||||
"Row error: " <> show row <> ":" <> show column <> " " <> show rowError
|
||||
UnexpectedAmountOfRowsResultError amount ->
|
||||
"Unexpected amount of rows: " <> show amount
|
||||
|
||||
-- |
|
||||
-- An error of some command in the session.
|
||||
data QueryError
|
||||
data CommandError
|
||||
= -- |
|
||||
-- An error on the client-side,
|
||||
-- with a message generated by the \"libpq\" driver.
|
||||
-- Usually indicates problems with connection.
|
||||
ClientQueryError (Maybe ByteString)
|
||||
ClientCommandError (Maybe ByteString)
|
||||
| -- |
|
||||
-- Some error with a command result.
|
||||
ResultQueryError ResultError
|
||||
ResultCommandError ResultError
|
||||
deriving (Show, Eq)
|
||||
|
||||
-- |
|
||||
|
@ -56,7 +56,7 @@ initConnection c =
|
||||
void $ LibPQ.exec c (Commands.asBytes (Commands.setEncodersToUTF8 <> Commands.setMinClientMessagesToWarning))
|
||||
|
||||
{-# INLINE getResults #-}
|
||||
getResults :: LibPQ.Connection -> Bool -> ResultsDecoders.Results a -> IO (Either QueryError a)
|
||||
getResults :: LibPQ.Connection -> Bool -> ResultsDecoders.Results a -> IO (Either CommandError a)
|
||||
getResults connection integerDatetimes decoder =
|
||||
{-# SCC "getResults" #-}
|
||||
(<*) <$> get <*> dropRemainders
|
||||
@ -72,7 +72,7 @@ getPreparedStatementKey ::
|
||||
PreparedStatementRegistry.PreparedStatementRegistry ->
|
||||
ByteString ->
|
||||
[LibPQ.Oid] ->
|
||||
IO (Either QueryError ByteString)
|
||||
IO (Either CommandError ByteString)
|
||||
getPreparedStatementKey connection registry template oidList =
|
||||
{-# SCC "getPreparedStatementKey" #-}
|
||||
PreparedStatementRegistry.update localKey onNewRemoteKey onOldRemoteKey registry
|
||||
@ -96,10 +96,10 @@ getPreparedStatementKey connection registry template oidList =
|
||||
pure (pure key)
|
||||
|
||||
{-# INLINE checkedSend #-}
|
||||
checkedSend :: LibPQ.Connection -> IO Bool -> IO (Either QueryError ())
|
||||
checkedSend :: LibPQ.Connection -> IO Bool -> IO (Either CommandError ())
|
||||
checkedSend connection send =
|
||||
send >>= \case
|
||||
False -> fmap (Left . ClientQueryError) $ LibPQ.errorMessage connection
|
||||
False -> fmap (Left . ClientCommandError) $ LibPQ.errorMessage connection
|
||||
True -> pure (Right ())
|
||||
|
||||
{-# INLINE sendPreparedParametricStatement #-}
|
||||
@ -110,7 +110,7 @@ sendPreparedParametricStatement ::
|
||||
ByteString ->
|
||||
ParamsEncoders.Params a ->
|
||||
a ->
|
||||
IO (Either QueryError ())
|
||||
IO (Either CommandError ())
|
||||
sendPreparedParametricStatement connection registry integerDatetimes template encoder input =
|
||||
runExceptT $ do
|
||||
key <- ExceptT $ getPreparedStatementKey connection registry template oidList
|
||||
@ -126,7 +126,7 @@ sendUnpreparedParametricStatement ::
|
||||
ByteString ->
|
||||
ParamsEncoders.Params a ->
|
||||
a ->
|
||||
IO (Either QueryError ())
|
||||
IO (Either CommandError ())
|
||||
sendUnpreparedParametricStatement connection integerDatetimes template encoder input =
|
||||
checkedSend connection
|
||||
$ LibPQ.sendQueryParams
|
||||
@ -144,7 +144,7 @@ sendParametricStatement ::
|
||||
ParamsEncoders.Params a ->
|
||||
Bool ->
|
||||
a ->
|
||||
IO (Either QueryError ())
|
||||
IO (Either CommandError ())
|
||||
sendParametricStatement connection integerDatetimes registry template encoder prepared params =
|
||||
{-# SCC "sendParametricStatement" #-}
|
||||
if prepared
|
||||
@ -152,6 +152,6 @@ sendParametricStatement connection integerDatetimes registry template encoder pr
|
||||
else sendUnpreparedParametricStatement connection integerDatetimes template encoder params
|
||||
|
||||
{-# INLINE sendNonparametricStatement #-}
|
||||
sendNonparametricStatement :: LibPQ.Connection -> ByteString -> IO (Either QueryError ())
|
||||
sendNonparametricStatement :: LibPQ.Connection -> ByteString -> IO (Either CommandError ())
|
||||
sendNonparametricStatement connection sql =
|
||||
checkedSend connection $ LibPQ.sendQuery connection sql
|
||||
|
@ -1,41 +1,54 @@
|
||||
{-# OPTIONS_GHC -Wno-unused-imports -Wno-unused-binds #-}
|
||||
|
||||
module Hasql.Pipeline.Core where
|
||||
|
||||
import Database.PostgreSQL.LibPQ qualified as Pq
|
||||
import Hasql.Connection.Core qualified as Connection
|
||||
import Hasql.Decoders.All qualified as Decoders
|
||||
import Hasql.Decoders.Result qualified as Decoders.Result
|
||||
import Hasql.Decoders.Results qualified as Decoders.Results
|
||||
import Hasql.Encoders.All qualified as Encoders
|
||||
import Hasql.Encoders.Params qualified as Encoders.Params
|
||||
import Hasql.Errors
|
||||
import Hasql.IO qualified as IO
|
||||
import Hasql.Prelude
|
||||
import Hasql.PreparedStatementRegistry qualified as PreparedStatementRegistry
|
||||
import Hasql.Statement qualified as Statement
|
||||
|
||||
run :: Pipeline a -> Connection.Connection -> IO (Either SessionError a)
|
||||
run (Pipeline send) (Connection.Connection pqConnectionRef integerDatetimes registry) =
|
||||
withMVar pqConnectionRef \pqConnection -> do
|
||||
runCommandFailing pqConnection $ Pq.enterPipelineMode pqConnection
|
||||
sendResult <- send pqConnection registry integerDatetimes
|
||||
case sendResult of
|
||||
Left err -> do
|
||||
pure (Left err)
|
||||
Right recv -> do
|
||||
runCommandFailing pqConnection $ Pq.pipelineSync pqConnection
|
||||
recvResult <- recv
|
||||
handleEither =<< Decoders.Results.run (Decoders.Results.single Decoders.Result.pipelineSync) pqConnection integerDatetimes
|
||||
runCommandFailing pqConnection $ Pq.exitPipelineMode pqConnection
|
||||
pure recvResult
|
||||
run :: forall a. Pipeline a -> Pq.Connection -> PreparedStatementRegistry.PreparedStatementRegistry -> Bool -> IO (Either SessionError a)
|
||||
run (Pipeline sendQueriesInIO) connection registry integerDatetimes = do
|
||||
runExceptT do
|
||||
enterPipelineMode
|
||||
flip finallyE exitPipelineMode do
|
||||
recvQueries <- sendQueries
|
||||
pipelineSync
|
||||
queriesResult <- recvQueries
|
||||
recvPipelineSync
|
||||
pure queriesResult
|
||||
where
|
||||
runCommandFailing :: Pq.Connection -> IO Bool -> IO ()
|
||||
runCommandFailing pqConn runCmd =
|
||||
IO.checkedSend pqConn runCmd >>= handleEither
|
||||
handleEither = \case
|
||||
Right a -> pure a
|
||||
Left err -> fail $ show err
|
||||
enterPipelineMode :: ExceptT SessionError IO ()
|
||||
enterPipelineMode =
|
||||
runCommand $ Pq.enterPipelineMode connection
|
||||
|
||||
exitPipelineMode :: ExceptT SessionError IO ()
|
||||
exitPipelineMode =
|
||||
runCommand $ Pq.exitPipelineMode connection
|
||||
|
||||
sendQueries :: ExceptT SessionError IO (ExceptT SessionError IO a)
|
||||
sendQueries =
|
||||
fmap ExceptT $ ExceptT $ sendQueriesInIO connection registry integerDatetimes
|
||||
|
||||
pipelineSync :: ExceptT SessionError IO ()
|
||||
pipelineSync =
|
||||
runCommand $ Pq.pipelineSync connection
|
||||
|
||||
recvPipelineSync :: ExceptT SessionError IO ()
|
||||
recvPipelineSync =
|
||||
ExceptT
|
||||
$ fmap (mapLeft PipelineSessionError)
|
||||
$ Decoders.Results.run (Decoders.Results.single Decoders.Result.pipelineSync) connection integerDatetimes
|
||||
|
||||
runCommand :: IO Bool -> ExceptT SessionError IO ()
|
||||
runCommand action =
|
||||
lift action >>= \case
|
||||
True -> pure ()
|
||||
False -> ExceptT (Left . PipelineSessionError . ClientCommandError <$> Pq.errorMessage connection)
|
||||
|
||||
newtype Pipeline a
|
||||
= Pipeline
|
||||
@ -89,7 +102,7 @@ statement params (Statement.Statement sql (Encoders.Params encoder) (Decoders.Re
|
||||
sent <- Pq.sendPrepare connection key sql (mfilter (not . null) (Just oidList))
|
||||
if sent
|
||||
then pure (True, Right (key, recv))
|
||||
else (False,) . Left . commandToSessionError . ClientQueryError <$> Pq.errorMessage connection
|
||||
else (False,) . Left . commandToSessionError . ClientCommandError <$> Pq.errorMessage connection
|
||||
where
|
||||
recv =
|
||||
fmap (mapLeft commandToSessionError)
|
||||
@ -101,7 +114,7 @@ statement params (Statement.Statement sql (Encoders.Params encoder) (Decoders.Re
|
||||
|
||||
sendQuery key =
|
||||
Pq.sendQueryPrepared connection key valueAndFormatList Pq.Binary >>= \case
|
||||
False -> Left . commandToSessionError . ClientQueryError <$> Pq.errorMessage connection
|
||||
False -> Left . commandToSessionError . ClientCommandError <$> Pq.errorMessage connection
|
||||
True -> pure (Right recv)
|
||||
where
|
||||
recv =
|
||||
@ -112,7 +125,7 @@ statement params (Statement.Statement sql (Encoders.Params encoder) (Decoders.Re
|
||||
|
||||
runUnprepared =
|
||||
Pq.sendQueryParams connection sql (Encoders.Params.compileUnpreparedStatementData encoder integerDatetimes params) Pq.Binary >>= \case
|
||||
False -> Left . commandToSessionError . ClientQueryError <$> Pq.errorMessage connection
|
||||
False -> Left . commandToSessionError . ClientCommandError <$> Pq.errorMessage connection
|
||||
True -> pure (Right recv)
|
||||
where
|
||||
recv =
|
||||
|
@ -25,7 +25,7 @@ import Control.Monad.Reader.Class as Exports (MonadReader (..))
|
||||
import Control.Monad.ST as Exports
|
||||
import Control.Monad.Trans.Class as Exports
|
||||
import Control.Monad.Trans.Cont as Exports hiding (callCC, shift)
|
||||
import Control.Monad.Trans.Except as Exports (Except, ExceptT (ExceptT), catchE, except, mapExcept, mapExceptT, runExcept, runExceptT, throwE, withExcept, withExceptT)
|
||||
import Control.Monad.Trans.Except as Exports (Except, ExceptT (ExceptT), catchE, except, finallyE, mapExcept, mapExceptT, runExcept, runExceptT, throwE, withExcept, withExceptT)
|
||||
import Control.Monad.Trans.Maybe as Exports
|
||||
import Control.Monad.Trans.Reader as Exports (Reader, ReaderT (ReaderT), mapReader, mapReaderT, runReader, runReaderT, withReader, withReaderT)
|
||||
import Control.Monad.Trans.State.Strict as Exports (State, StateT (StateT), evalState, evalStateT, execState, execStateT, mapState, mapStateT, runState, runStateT, withState, withStateT)
|
||||
|
@ -62,4 +62,6 @@ statement input (Statement.Statement template (Encoders.Params paramsEncoder) (D
|
||||
|
||||
pipeline :: Pipeline.Pipeline result -> Session result
|
||||
pipeline pipeline =
|
||||
Session $ ReaderT $ ExceptT . Pipeline.run pipeline
|
||||
Session $ ReaderT \(Connection.Connection pqConnectionRef integerDatetimes registry) ->
|
||||
ExceptT $ withMVar pqConnectionRef \pqConnection ->
|
||||
Pipeline.run pipeline pqConnection registry integerDatetimes
|
||||
|
@ -218,7 +218,7 @@ tree =
|
||||
where
|
||||
resultTest =
|
||||
\case
|
||||
Right (Left (Session.QuerySessionError _ _ (Session.ResultQueryError (Session.ServerResultError "26000" _ _ _ _)))) -> False
|
||||
Right (Left (Session.QuerySessionError _ _ (Session.ResultCommandError (Session.ServerResultError "26000" _ _ _ _)))) -> False
|
||||
_ -> True
|
||||
session =
|
||||
catchError session (const (pure ())) *> session
|
||||
|
@ -2,7 +2,7 @@ module Hasql.TestingUtils.TestingDsl
|
||||
( Session.Session,
|
||||
Error (..),
|
||||
Session.SessionError (..),
|
||||
Session.QueryError (..),
|
||||
Session.CommandError (..),
|
||||
Pipeline.Pipeline,
|
||||
Statement.Statement (..),
|
||||
runSessionOnLocalDb,
|
||||
|
Loading…
Reference in New Issue
Block a user