Merge pull request #160 from nikita-volkov/pipelining

Pipelining mode
This commit is contained in:
Nikita Volkov 2024-04-28 13:18:08 +03:00 committed by GitHub
commit 5b6dd9dfdc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
31 changed files with 947 additions and 257 deletions

26
.github/workflows/on-pr.yaml vendored Normal file
View File

@ -0,0 +1,26 @@
on:
pull_request:
types: [assigned, opened, synchronize, reopened, labeled, unlabeled]
branches:
- master
- major
- minor
- patch
jobs:
format:
uses: nikita-volkov/haskell-hackage-lib-github-actions-workflows/.github/workflows/format.yaml@v3
secrets: inherit
check:
uses: ./.github/workflows/check.yaml
secrets: inherit
check-changelog:
name: Check Changelog Action
runs-on: ubuntu-20.04
steps:
- uses: tarides/changelog-check-action@v2
with:
changelog: CHANGELOG.md

View File

@ -2,7 +2,6 @@ on:
push:
branches:
- master
pull_request:
jobs:

View File

@ -1,6 +1,8 @@
# 1.7
- Decidable instance on `Encoders.Params` removed. It was useless and limited the design.
- `QueryError` type renamed to `SessionError`.
- `PipelineError` constructor added to the `SessionError` type.
# 1.6.3.1

View File

@ -4,6 +4,7 @@ import Criterion
import Criterion.Main
import Hasql.Connection qualified as A
import Hasql.Decoders qualified as D
import Hasql.Pipeline qualified as E
import Hasql.Session qualified as B
import Hasql.Statement qualified as C
import Prelude
@ -21,41 +22,43 @@ main =
[ sessionBench "largeResultInVector" sessionWithSingleLargeResultInVector,
sessionBench "largeResultInList" sessionWithSingleLargeResultInList,
sessionBench "manyLargeResults" sessionWithManyLargeResults,
sessionBench "manySmallResults" sessionWithManySmallResults
sessionBench "manyLargeResultsViaPipeline" sessionWithManyLargeResultsViaPipeline,
sessionBench "manySmallResults" sessionWithManySmallResults,
sessionBench "manySmallResultsViaPipeline" sessionWithManySmallResultsViaPipeline
]
where
sessionBench :: (NFData a) => String -> B.Session a -> Benchmark
sessionBench name session =
bench name (nfIO (fmap (either (error "") id) (B.run session connection)))
bench name (nfIO (B.run session connection >>= either (fail . show) pure))
-- * Sessions
sessionWithManySmallParameters :: Vector (Int64, Int64) -> B.Session ()
sessionWithManySmallParameters =
error "TODO: sessionWithManySmallParameters"
sessionWithSingleLargeResultInVector :: B.Session (Vector (Int64, Int64))
sessionWithSingleLargeResultInVector =
B.statement () statementWithManyRowsInVector
sessionWithManyLargeResults :: B.Session [Vector (Int64, Int64)]
sessionWithManyLargeResults =
replicateM 1000 (B.statement () statementWithManyRowsInVector)
sessionWithSingleLargeResultInList :: B.Session [(Int64, Int64)]
sessionWithSingleLargeResultInList =
B.statement () statementWithManyRowsInList
sessionWithManyLargeResults :: B.Session [Vector (Int64, Int64)]
sessionWithManyLargeResults =
replicateM 100 (B.statement () statementWithManyRowsInVector)
sessionWithManySmallResults :: B.Session [(Int64, Int64)]
sessionWithManySmallResults =
replicateM 1000 (B.statement () statementWithSingleRow)
replicateM 100 (B.statement () statementWithSingleRow)
sessionWithManyLargeResultsViaPipeline :: B.Session [Vector (Int64, Int64)]
sessionWithManyLargeResultsViaPipeline =
B.pipeline (replicateM 100 (E.statement () statementWithManyRowsInVector))
sessionWithManySmallResultsViaPipeline :: B.Session [(Int64, Int64)]
sessionWithManySmallResultsViaPipeline =
B.pipeline (replicateM 100 (E.statement () statementWithSingleRow))
-- * Statements
statementWithManyParameters :: C.Statement (Vector (Int64, Int64)) ()
statementWithManyParameters =
error "TODO: statementWithManyParameters"
statementWithSingleRow :: C.Statement () (Int64, Int64)
statementWithSingleRow =
C.Statement template encoder decoder True

View File

@ -9,6 +9,11 @@ description:
<https://github.com/nikita-volkov/hasql the readme>.
The API comes free from all kinds of exceptions. All error-reporting is explicit and is presented using the 'Either' type.
This library requires to have the \"libpq\" library installed on the running system.
It comes distributed with PostgreSQL.
To be able to use the \"Pipeline\" feature you'll need \"libpq\" of version >14.
This feature does not however put any requirements on the version of the PostgreSQL server.
homepage: https://github.com/nikita-volkov/hasql
bug-reports: https://github.com/nikita-volkov/hasql/issues
author: Nikita Volkov <nikita.y.volkov@mail.ru>
@ -27,8 +32,10 @@ source-repository head
common base
default-language: Haskell2010
default-extensions:
ApplicativeDo
Arrows
BangPatterns
BlockArguments
ConstraintKinds
DataKinds
DefaultSignatures
@ -37,6 +44,7 @@ common base
DeriveFunctor
DeriveGeneric
DeriveTraversable
DerivingVia
EmptyDataDecls
FlexibleContexts
FlexibleInstances
@ -46,13 +54,11 @@ common base
ImportQualifiedPost
LambdaCase
LiberalTypeSynonyms
MagicHash
MultiParamTypeClasses
MultiWayIf
NoImplicitPrelude
NoMonomorphismRestriction
OverloadedStrings
ParallelListComp
PatternGuards
QuasiQuotes
RankNTypes
@ -60,11 +66,10 @@ common base
RoleAnnotations
ScopedTypeVariables
StandaloneDeriving
TemplateHaskell
StrictData
TupleSections
TypeFamilies
TypeOperators
UnboxedTuples
common executable
import: base
@ -88,6 +93,7 @@ library
Hasql.Connection
Hasql.Decoders
Hasql.Encoders
Hasql.Pipeline
Hasql.Session
Hasql.Statement
@ -107,6 +113,10 @@ library
Hasql.Encoders.Value
Hasql.Errors
Hasql.IO
Hasql.LibPq14
Hasql.LibPq14.Ffi
Hasql.LibPq14.Mappings
Hasql.Pipeline.Core
Hasql.PostgresTypeInfo
Hasql.Prelude
Hasql.PreparedStatementRegistry
@ -126,22 +136,25 @@ library
mtl >=2 && <3,
network-ip >=0.3.0.3 && <0.4,
postgresql-binary >=0.13.1 && <0.14,
postgresql-libpq >=0.9 && <0.11,
postgresql-libpq ==0.10.1.0,
profunctors >=5.1 && <6,
scientific >=0.3 && <0.4,
text >=1 && <3,
text-builder >=0.6.7 && <0.7,
time >=1.9 && <2,
transformers >=0.3 && <0.7,
transformers >=0.6 && <0.7,
uuid >=1.3 && <2,
vector >=0.10 && <0.14,
library testing-utils
library testing-kit
import: base
hs-source-dirs: testing-utils
hs-source-dirs: testing-kit
exposed-modules:
Hasql.TestingUtils.Constants
Hasql.TestingUtils.TestingDsl
Hasql.TestingKit.Constants
Hasql.TestingKit.Statements.BrokenSyntax
Hasql.TestingKit.Statements.GenerateSeries
Hasql.TestingKit.Statements.WrongDecoder
Hasql.TestingKit.TestingDsl
build-depends:
hasql,
@ -160,7 +173,7 @@ test-suite tasty
build-depends:
contravariant-extras >=0.3.5.2 && <0.4,
hasql,
hasql:testing-utils,
hasql:testing-kit,
quickcheck-instances >=0.3.11 && <0.4,
rerebase <2,
tasty >=0.12 && <2,
@ -200,3 +213,17 @@ test-suite profiling
build-depends:
hasql,
rerebase >=1 && <2,
test-suite hspec
import: test
type: exitcode-stdio-1.0
hs-source-dirs: hspec
main-is: Main.hs
other-modules:
Hasql.PipelineSpec
build-tool-depends: hspec-discover:hspec-discover
build-depends:
hasql:testing-kit,
hspec,
rerebase >=1 && <2,

View File

@ -0,0 +1,91 @@
module Hasql.PipelineSpec (spec) where
import Hasql.TestingKit.Statements.BrokenSyntax qualified as BrokenSyntax
import Hasql.TestingKit.Statements.GenerateSeries qualified as GenerateSeries
import Hasql.TestingKit.Statements.WrongDecoder qualified as WrongDecoder
import Hasql.TestingKit.TestingDsl qualified as Dsl
import Test.Hspec
import Prelude
spec :: Spec
spec = do
describe "Single-statement" do
describe "Unprepared" do
it "Collects results and sends params" do
result <-
Dsl.runPipelineOnLocalDb
$ GenerateSeries.pipeline False GenerateSeries.Params {start = 0, end = 2}
shouldBe result (Right [0 .. 2])
describe "Prepared" do
it "Collects results and sends params" do
result <-
Dsl.runPipelineOnLocalDb
$ GenerateSeries.pipeline True GenerateSeries.Params {start = 0, end = 2}
shouldBe result (Right [0 .. 2])
describe "Multi-statement" do
describe "On unprepared statements" do
it "Collects results and sends params" do
result <-
Dsl.runPipelineOnLocalDb
$ replicateM 2
$ GenerateSeries.pipeline False GenerateSeries.Params {start = 0, end = 2}
shouldBe result (Right [[0 .. 2], [0 .. 2]])
describe "On prepared statements" do
it "Collects results and sends params" do
result <-
Dsl.runPipelineOnLocalDb
$ replicateM 2
$ GenerateSeries.pipeline True GenerateSeries.Params {start = 0, end = 2}
shouldBe result (Right [[0 .. 2], [0 .. 2]])
describe "When a part in the middle fails" do
describe "With query error" do
it "Captures the error" do
result <-
Dsl.runPipelineOnLocalDb
$ (,,)
<$> GenerateSeries.pipeline True GenerateSeries.Params {start = 0, end = 2}
<*> BrokenSyntax.pipeline True BrokenSyntax.Params {start = 0, end = 2}
<*> GenerateSeries.pipeline True GenerateSeries.Params {start = 0, end = 2}
case result of
Left (Dsl.SessionError (Dsl.QueryError _ _ _)) -> pure ()
_ -> expectationFailure $ "Unexpected result: " <> show result
it "Leaves the connection usable" do
result <-
Dsl.runSessionOnLocalDb do
tryError
$ Dsl.runPipelineInSession
$ (,,)
<$> GenerateSeries.pipeline True GenerateSeries.Params {start = 0, end = 2}
<*> BrokenSyntax.pipeline True BrokenSyntax.Params {start = 0, end = 2}
<*> GenerateSeries.pipeline True GenerateSeries.Params {start = 0, end = 2}
GenerateSeries.session True GenerateSeries.Params {start = 0, end = 0}
shouldBe result (Right [0])
describe "With decoding error" do
it "Captures the error" do
result <-
Dsl.runPipelineOnLocalDb
$ (,,)
<$> GenerateSeries.pipeline True GenerateSeries.Params {start = 0, end = 2}
<*> WrongDecoder.pipeline True WrongDecoder.Params {start = 0, end = 2}
<*> GenerateSeries.pipeline True GenerateSeries.Params {start = 0, end = 2}
case result of
Left (Dsl.SessionError (Dsl.QueryError _ _ _)) -> pure ()
_ -> expectationFailure $ "Unexpected result: " <> show result
it "Leaves the connection usable" do
result <-
Dsl.runSessionOnLocalDb do
tryError
$ Dsl.runPipelineInSession
$ (,,)
<$> GenerateSeries.pipeline True GenerateSeries.Params {start = 0, end = 2}
<*> WrongDecoder.pipeline True WrongDecoder.Params {start = 0, end = 2}
<*> GenerateSeries.pipeline True GenerateSeries.Params {start = 0, end = 2}
GenerateSeries.session True GenerateSeries.Params {start = 0, end = 0}
shouldBe result (Right [0])

1
hspec/Main.hs Normal file
View File

@ -0,0 +1 @@
{-# OPTIONS_GHC -F -pgmF hspec-discover #-}

View File

@ -2,8 +2,8 @@
-- This module provides a low-level effectful API dealing with the connections to the database.
module Hasql.Connection.Core where
import Database.PostgreSQL.LibPQ qualified as LibPQ
import Hasql.IO qualified as IO
import Hasql.LibPq14 qualified as LibPQ
import Hasql.Prelude
import Hasql.PreparedStatementRegistry qualified as PreparedStatementRegistry
import Hasql.Settings qualified as Settings

View File

@ -4,9 +4,9 @@ import Data.Attoparsec.ByteString.Char8 qualified as Attoparsec
import Data.ByteString qualified as ByteString
import Data.Vector qualified as Vector
import Data.Vector.Mutable qualified as MutableVector
import Database.PostgreSQL.LibPQ qualified as LibPQ
import Hasql.Decoders.Row qualified as Row
import Hasql.Errors
import Hasql.LibPq14 qualified as LibPQ
import Hasql.Prelude hiding (many, maybe)
import Hasql.Prelude qualified as Prelude
@ -15,25 +15,25 @@ newtype Result a
deriving (Functor, Applicative, Monad)
{-# INLINE run #-}
run :: Result a -> (Bool, LibPQ.Result) -> IO (Either ResultError a)
run (Result reader) env =
runExceptT (runReaderT reader env)
run :: Result a -> Bool -> LibPQ.Result -> IO (Either ResultError a)
run (Result reader) idt result =
runExceptT (runReaderT reader (idt, result))
{-# INLINE pipelineSync #-}
pipelineSync :: Result ()
pipelineSync =
checkExecStatus [LibPQ.PipelineSync]
{-# INLINE noResult #-}
noResult :: Result ()
noResult =
checkExecStatus $ \case
LibPQ.CommandOk -> True
LibPQ.TuplesOk -> True
_ -> False
checkExecStatus [LibPQ.CommandOk, LibPQ.TuplesOk]
{-# INLINE rowsAffected #-}
rowsAffected :: Result Int64
rowsAffected =
do
checkExecStatus $ \case
LibPQ.CommandOk -> True
_ -> False
checkExecStatus [LibPQ.CommandOk]
Result
$ ReaderT
$ \(_, result) ->
@ -51,22 +51,26 @@ rowsAffected =
then Left (UnexpectedResult "Empty bytes")
else Right bytes
decimal bytes =
mapLeft (\m -> UnexpectedResult ("Decimal parsing failure: " <> fromString m))
first (\m -> UnexpectedResult ("Decimal parsing failure: " <> fromString m))
$ Attoparsec.parseOnly (Attoparsec.decimal <* Attoparsec.endOfInput) bytes
{-# INLINE checkExecStatus #-}
checkExecStatus :: (LibPQ.ExecStatus -> Bool) -> Result ()
checkExecStatus predicate =
checkExecStatus :: [LibPQ.ExecStatus] -> Result ()
checkExecStatus expectedList =
{-# SCC "checkExecStatus" #-}
do
status <- Result $ ReaderT $ \(_, result) -> lift $ LibPQ.resultStatus result
unless (predicate status) $ do
unless (elem status expectedList) $ do
case status of
LibPQ.BadResponse -> serverError
LibPQ.NonfatalError -> serverError
LibPQ.FatalError -> serverError
LibPQ.EmptyQuery -> return ()
_ -> Result $ lift $ ExceptT $ pure $ Left $ UnexpectedResult $ "Unexpected result status: " <> (fromString $ show status)
_ -> unexpectedResult $ "Unexpected result status: " <> fromString (show status) <> ". Expecting one of the following: " <> fromString (show expectedList)
unexpectedResult :: Text -> Result a
unexpectedResult =
Result . lift . ExceptT . pure . Left . UnexpectedResult
{-# INLINE serverError #-}
serverError :: Result ()
@ -99,9 +103,7 @@ serverError =
maybe :: Row.Row a -> Result (Maybe a)
maybe rowDec =
do
checkExecStatus $ \case
LibPQ.TuplesOk -> True
_ -> False
checkExecStatus [LibPQ.TuplesOk]
Result
$ ReaderT
$ \(integerDatetimes, result) -> ExceptT $ do
@ -111,7 +113,7 @@ maybe rowDec =
1 -> do
maxCols <- LibPQ.nfields result
let fromRowError (col, err) = RowError 0 col err
fmap (fmap Just . mapLeft fromRowError) $ Row.run rowDec (result, 0, maxCols, integerDatetimes)
fmap (fmap Just . first fromRowError) $ Row.run rowDec (result, 0, maxCols, integerDatetimes)
_ -> return (Left (UnexpectedAmountOfRows (rowToInt maxRows)))
where
rowToInt (LibPQ.Row n) =
@ -121,9 +123,7 @@ maybe rowDec =
single :: Row.Row a -> Result a
single rowDec =
do
checkExecStatus $ \case
LibPQ.TuplesOk -> True
_ -> False
checkExecStatus [LibPQ.TuplesOk]
Result
$ ReaderT
$ \(integerDatetimes, result) -> ExceptT $ do
@ -132,7 +132,7 @@ single rowDec =
1 -> do
maxCols <- LibPQ.nfields result
let fromRowError (col, err) = RowError 0 col err
fmap (mapLeft fromRowError) $ Row.run rowDec (result, 0, maxCols, integerDatetimes)
fmap (first fromRowError) $ Row.run rowDec (result, 0, maxCols, integerDatetimes)
_ -> return (Left (UnexpectedAmountOfRows (rowToInt maxRows)))
where
rowToInt (LibPQ.Row n) =
@ -142,9 +142,7 @@ single rowDec =
vector :: Row.Row a -> Result (Vector a)
vector rowDec =
do
checkExecStatus $ \case
LibPQ.TuplesOk -> True
_ -> False
checkExecStatus [LibPQ.TuplesOk]
Result
$ ReaderT
$ \(integerDatetimes, result) -> ExceptT $ do
@ -171,9 +169,7 @@ foldl :: (a -> b -> a) -> a -> Row.Row b -> Result a
foldl step init rowDec =
{-# SCC "foldl" #-}
do
checkExecStatus $ \case
LibPQ.TuplesOk -> True
_ -> False
checkExecStatus [LibPQ.TuplesOk]
Result
$ ReaderT
$ \(integerDatetimes, result) ->
@ -203,9 +199,7 @@ foldr :: (b -> a -> a) -> a -> Row.Row b -> Result a
foldr step init rowDec =
{-# SCC "foldr" #-}
do
checkExecStatus $ \case
LibPQ.TuplesOk -> True
_ -> False
checkExecStatus [LibPQ.TuplesOk]
Result
$ ReaderT
$ \(integerDatetimes, result) -> ExceptT $ do

View File

@ -10,9 +10,9 @@
-- * Row-by-row fetching.
module Hasql.Decoders.Results where
import Database.PostgreSQL.LibPQ qualified as LibPQ
import Hasql.Decoders.Result qualified as Result
import Hasql.Errors
import Hasql.LibPq14 qualified as LibPQ
import Hasql.Prelude hiding (many, maybe)
import Hasql.Prelude qualified as Prelude
@ -21,9 +21,9 @@ newtype Results a
deriving (Functor, Applicative, Monad)
{-# INLINE run #-}
run :: Results a -> (Bool, LibPQ.Connection) -> IO (Either CommandError a)
run (Results stack) env =
runExceptT (runReaderT stack env)
run :: Results a -> LibPQ.Connection -> Bool -> IO (Either CommandError a)
run (Results stack) conn idt =
runExceptT (runReaderT stack (idt, conn))
{-# INLINE clientError #-}
clientError :: Results a
@ -45,30 +45,10 @@ single resultDec =
resultMaybe <- LibPQ.getResult connection
case resultMaybe of
Just result ->
mapLeft ResultError <$> Result.run resultDec (integerDatetimes, result)
first ResultError <$> Result.run resultDec integerDatetimes result
Nothing ->
fmap (Left . ClientError) (LibPQ.errorMessage connection)
-- |
-- Fetch a single result.
{-# INLINE getResult #-}
getResult :: Results LibPQ.Result
getResult =
Results
$ ReaderT
$ \(_, connection) -> ExceptT $ do
resultMaybe <- LibPQ.getResult connection
case resultMaybe of
Just result -> pure (Right result)
Nothing -> fmap (Left . ClientError) (LibPQ.errorMessage connection)
-- |
-- Fetch a single result.
{-# INLINE getResultMaybe #-}
getResultMaybe :: Results (Maybe LibPQ.Result)
getResultMaybe =
Results $ ReaderT $ \(_, connection) -> lift $ LibPQ.getResult connection
{-# INLINE dropRemainders #-}
dropRemainders :: Results ()
dropRemainders =
@ -84,11 +64,11 @@ dropRemainders =
loop integerDatetimes connection <* checkErrors
where
checkErrors =
ExceptT $ fmap (mapLeft ResultError) $ Result.run Result.noResult (integerDatetimes, result)
ExceptT $ fmap (first ResultError) $ Result.run Result.noResult integerDatetimes result
refine :: (a -> Either Text b) -> Results a -> Results b
refine refiner results = Results
refine refiner (Results stack) = Results
$ ReaderT
$ \env -> ExceptT $ do
resultEither <- run results env
return $ resultEither >>= mapLeft (ResultError . UnexpectedResult) . refiner
resultEither <- runExceptT $ runReaderT stack env
return $ resultEither >>= first (ResultError . UnexpectedResult) . refiner

View File

@ -1,8 +1,8 @@
module Hasql.Decoders.Row where
import Database.PostgreSQL.LibPQ qualified as LibPQ
import Hasql.Decoders.Value qualified as Value
import Hasql.Errors
import Hasql.LibPq14 qualified as LibPQ
import Hasql.Prelude hiding (error)
import PostgreSQL.Binary.Decoding qualified as A
@ -55,7 +55,7 @@ value valueDec =
Right Nothing
Just value ->
fmap Just
$ mapLeft ValueError
$ first ValueError
$ {-# SCC "decode" #-} A.valueParser (Value.run valueDec integerDatetimes) value
else pure (Left EndOfInput)

View File

@ -1,12 +1,37 @@
module Hasql.Encoders.Params where
import Database.PostgreSQL.LibPQ qualified as A
import Hasql.Encoders.Value qualified as C
import Hasql.LibPq14 qualified as A
import Hasql.PostgresTypeInfo qualified as D
import Hasql.Prelude
import PostgreSQL.Binary.Encoding qualified as B
import Text.Builder qualified as E
renderReadable :: Params a -> a -> [Text]
renderReadable (Params _ _ _ printer) params =
printer params
& toList
compilePreparedStatementData :: Params a -> Bool -> a -> ([A.Oid], [Maybe (ByteString, A.Format)])
compilePreparedStatementData (Params _ columnsMetadata serializer _) integerDatetimes input =
(oidList, valueAndFormatList)
where
(oidList, formatList) =
columnsMetadata & toList & unzip
valueAndFormatList =
serializer integerDatetimes input
& toList
& zipWith (\format encoding -> (,format) <$> encoding) formatList
compileUnpreparedStatementData :: Params a -> Bool -> a -> [Maybe (A.Oid, ByteString, A.Format)]
compileUnpreparedStatementData (Params _ columnsMetadata serializer printer) integerDatetimes input =
zipWith
( \(oid, format) encoding ->
(,,) <$> pure oid <*> encoding <*> pure format
)
(toList columnsMetadata)
(toList (serializer integerDatetimes input))
-- |
-- Encoder of some representation of a parameters product.
data Params a = Params

View File

@ -1,27 +1,28 @@
-- |
-- An API for retrieval of multiple results.
-- Can be used to handle:
--
-- * A single result,
--
-- * Individual results of a multi-statement query
-- with the help of "Applicative" and "Monad",
--
-- * Row-by-row fetching.
module Hasql.Errors where
import Data.ByteString.Char8 qualified as BC
import Hasql.Prelude
-- |
-- An error during the execution of a query.
-- Comes packed with the query template and a textual representation of the provided params.
data QueryError
= QueryError ByteString [Text] CommandError
-- | Error during execution of a session.
data SessionError
= -- | Error during the execution of a query.
-- Comes packed with the query template and a textual representation of the provided params.
QueryError
-- | SQL template.
ByteString
-- | Parameters rendered as human-readable SQL literals.
[Text]
-- | Error details.
CommandError
| -- | Error during the execution of a pipeline.
PipelineError
-- | Error details.
CommandError
deriving (Show, Eq, Typeable)
instance Exception QueryError where
displayException (QueryError query params commandError) =
instance Exception SessionError where
displayException = \case
QueryError query params commandError ->
let queryContext :: Maybe (ByteString, Int)
queryContext = case commandError of
ClientError _ -> Nothing
@ -66,9 +67,13 @@ instance Exception QueryError where
<> "\n Params: "
<> show params
<> "\n Error: "
<> case commandError of
<> renderCommandErrorAsReason commandError
PipelineError commandError ->
"PipelineError!\n Reason: " <> renderCommandErrorAsReason commandError
where
renderCommandErrorAsReason = \case
ClientError (Just message) -> "Client error: " <> show message
ClientError Nothing -> "Unknown client error"
ClientError Nothing -> "Client error without details"
ResultError resultError -> case resultError of
ServerError code message details hint position ->
"Server error "

View File

@ -2,12 +2,12 @@
-- An API of low-level IO operations.
module Hasql.IO where
import Database.PostgreSQL.LibPQ qualified as LibPQ
import Hasql.Commands qualified as Commands
import Hasql.Decoders.Result qualified as ResultDecoders
import Hasql.Decoders.Results qualified as ResultsDecoders
import Hasql.Encoders.Params qualified as ParamsEncoders
import Hasql.Errors
import Hasql.LibPq14 qualified as LibPQ
import Hasql.Prelude
import Hasql.PreparedStatementRegistry qualified as PreparedStatementRegistry
@ -62,9 +62,9 @@ getResults connection integerDatetimes decoder =
(<*) <$> get <*> dropRemainders
where
get =
ResultsDecoders.run decoder (integerDatetimes, connection)
ResultsDecoders.run decoder connection integerDatetimes
dropRemainders =
ResultsDecoders.run ResultsDecoders.dropRemainders (integerDatetimes, connection)
ResultsDecoders.run ResultsDecoders.dropRemainders connection integerDatetimes
{-# INLINE getPreparedStatementKey #-}
getPreparedStatementKey ::
@ -78,19 +78,16 @@ getPreparedStatementKey connection registry template oidList =
PreparedStatementRegistry.update localKey onNewRemoteKey onOldRemoteKey registry
where
localKey =
PreparedStatementRegistry.LocalKey template wordOIDList
where
wordOIDList =
map (\(LibPQ.Oid x) -> fromIntegral x) oidList
PreparedStatementRegistry.LocalKey template oidList
onNewRemoteKey key =
do
sent <- LibPQ.sendPrepare connection key template (mfilter (not . null) (Just oidList))
let resultsDecoder =
fmap resultsMapping $ getResults connection undefined (resultsDecoder sent)
where
resultsDecoder sent =
if sent
then ResultsDecoders.single ResultDecoders.noResult
else ResultsDecoders.clientError
fmap resultsMapping $ getResults connection undefined resultsDecoder
where
resultsMapping =
\case
Left x -> (False, Left x)
@ -114,17 +111,13 @@ sendPreparedParametricStatement ::
ParamsEncoders.Params a ->
a ->
IO (Either CommandError ())
sendPreparedParametricStatement connection registry integerDatetimes template (ParamsEncoders.Params size columnsMetadata serializer _) input =
sendPreparedParametricStatement connection registry integerDatetimes template encoder input =
runExceptT $ do
key <- ExceptT $ getPreparedStatementKey connection registry template oidList
ExceptT $ checkedSend connection $ LibPQ.sendQueryPrepared connection key valueAndFormatList LibPQ.Binary
where
(oidList, formatList) =
columnsMetadata & toList & unzip
valueAndFormatList =
serializer integerDatetimes input
& toList
& zipWith (\format encoding -> (,format) <$> encoding) formatList
(oidList, valueAndFormatList) =
ParamsEncoders.compilePreparedStatementData encoder integerDatetimes input
{-# INLINE sendUnpreparedParametricStatement #-}
sendUnpreparedParametricStatement ::
@ -134,15 +127,13 @@ sendUnpreparedParametricStatement ::
ParamsEncoders.Params a ->
a ->
IO (Either CommandError ())
sendUnpreparedParametricStatement connection integerDatetimes template (ParamsEncoders.Params _ columnsMetadata serializer printer) input =
let params =
zipWith
( \(oid, format) encoding ->
(,,) <$> pure oid <*> encoding <*> pure format
)
(toList columnsMetadata)
(toList (serializer integerDatetimes input))
in checkedSend connection $ LibPQ.sendQueryParams connection template params LibPQ.Binary
sendUnpreparedParametricStatement connection integerDatetimes template encoder input =
checkedSend connection
$ LibPQ.sendQueryParams
connection
template
(ParamsEncoders.compileUnpreparedStatementData encoder integerDatetimes input)
LibPQ.Binary
{-# INLINE sendParametricStatement #-}
sendParametricStatement ::

83
library/Hasql/LibPq14.hs Normal file
View File

@ -0,0 +1,83 @@
module Hasql.LibPq14
( module Base,
-- * Updated and new types
Mappings.ExecStatus (..),
Mappings.PipelineStatus (..),
-- * Updated and new procedures
resultStatus,
pipelineStatus,
enterPipelineMode,
exitPipelineMode,
pipelineSync,
sendFlushRequest,
)
where
import Database.PostgreSQL.LibPQ as Base hiding (ExecStatus (..), resultStatus)
import Database.PostgreSQL.LibPQ.Internal qualified as BaseInternal
import Hasql.LibPq14.Ffi qualified as Ffi
import Hasql.LibPq14.Mappings qualified as Mappings
import Hasql.Prelude
resultStatus :: Result -> IO Mappings.ExecStatus
resultStatus result = do
-- Unsafe-coercing because the constructor is not exposed by the lib,
-- but it's implemented as a newtype over ForeignPtr.
-- Since internal changes in the \"postgresql-lipbq\" may break this,
-- it requires us to avoid using an open dependency range on it.
ffiStatus <- withForeignPtr (unsafeCoerce result) Ffi.resultStatus
decodeProcedureResult "resultStatus" Mappings.decodeExecStatus ffiStatus
pipelineStatus ::
Connection ->
IO Mappings.PipelineStatus
pipelineStatus =
parameterlessProcedure "pipelineStatus" Ffi.pipelineStatus Mappings.decodePipelineStatus
enterPipelineMode ::
Connection ->
IO Bool
enterPipelineMode =
parameterlessProcedure "enterPipelineMode" Ffi.enterPipelineMode Mappings.decodeBool
exitPipelineMode ::
Connection ->
IO Bool
exitPipelineMode =
parameterlessProcedure "exitPipelineMode" Ffi.exitPipelineMode Mappings.decodeBool
pipelineSync ::
Connection ->
IO Bool
pipelineSync =
parameterlessProcedure "pipelineSync" Ffi.pipelineSync Mappings.decodeBool
sendFlushRequest ::
Connection ->
IO Bool
sendFlushRequest =
parameterlessProcedure "sendFlushRequest" Ffi.sendFlushRequest Mappings.decodeBool
parameterlessProcedure ::
(Show a) =>
String ->
(Ptr BaseInternal.PGconn -> IO a) ->
(a -> Maybe b) ->
Connection ->
IO b
parameterlessProcedure label procedure decoder connection = do
ffiResult <- BaseInternal.withConn connection procedure
decodeProcedureResult label decoder ffiResult
decodeProcedureResult ::
(Show a) =>
String ->
(a -> Maybe b) ->
a ->
IO b
decodeProcedureResult label decoder ffiResult =
case decoder ffiResult of
Just res -> pure res
Nothing -> fail ("Failed to decode result of " <> label <> " from: " <> show ffiResult)

View File

@ -0,0 +1,25 @@
{-# LANGUAGE CApiFFI #-}
module Hasql.LibPq14.Ffi where
import Database.PostgreSQL.LibPQ.Internal
import Foreign.C.Types (CInt (..))
import Hasql.Prelude
foreign import capi "libpq-fe.h PQresultStatus"
resultStatus :: Ptr () -> IO CInt
foreign import capi "libpq-fe.h PQpipelineStatus"
pipelineStatus :: Ptr PGconn -> IO CInt
foreign import capi "libpq-fe.h PQenterPipelineMode"
enterPipelineMode :: Ptr PGconn -> IO CInt
foreign import capi "libpq-fe.h PQexitPipelineMode"
exitPipelineMode :: Ptr PGconn -> IO CInt
foreign import capi "libpq-fe.h PQpipelineSync"
pipelineSync :: Ptr PGconn -> IO CInt
foreign import capi "libpq-fe.h PQsendFlushRequest"
sendFlushRequest :: Ptr PGconn -> IO CInt

View File

@ -0,0 +1,71 @@
module Hasql.LibPq14.Mappings where
#include "libpq-fe.h"
import Foreign.C.Types (CInt (..))
import Hasql.Prelude
data ExecStatus
= EmptyQuery
| CommandOk
| TuplesOk
| CopyOut
| CopyIn
| CopyBoth
| BadResponse
| NonfatalError
| FatalError
| SingleTuple
| PipelineSync
| PipelineAbort
deriving (Eq, Show)
decodeExecStatus :: CInt -> Maybe ExecStatus
decodeExecStatus = \case
(#const PGRES_EMPTY_QUERY) -> Just EmptyQuery
(#const PGRES_COMMAND_OK) -> Just CommandOk
(#const PGRES_TUPLES_OK) -> Just TuplesOk
(#const PGRES_COPY_OUT) -> Just CopyOut
(#const PGRES_COPY_IN) -> Just CopyIn
(#const PGRES_COPY_BOTH) -> Just CopyBoth
(#const PGRES_BAD_RESPONSE) -> Just BadResponse
(#const PGRES_NONFATAL_ERROR) -> Just NonfatalError
(#const PGRES_FATAL_ERROR) -> Just FatalError
(#const PGRES_SINGLE_TUPLE) -> Just SingleTuple
(#const PGRES_PIPELINE_SYNC) -> Just PipelineSync
(#const PGRES_PIPELINE_ABORTED) -> Just PipelineAbort
_ -> Nothing
encodeExecStatus :: ExecStatus -> CInt
encodeExecStatus = \case
EmptyQuery -> #const PGRES_EMPTY_QUERY
CommandOk -> #const PGRES_COMMAND_OK
TuplesOk -> #const PGRES_TUPLES_OK
CopyOut -> #const PGRES_COPY_OUT
CopyIn -> #const PGRES_COPY_IN
CopyBoth -> #const PGRES_COPY_BOTH
BadResponse -> #const PGRES_BAD_RESPONSE
NonfatalError -> #const PGRES_NONFATAL_ERROR
FatalError -> #const PGRES_FATAL_ERROR
SingleTuple -> #const PGRES_SINGLE_TUPLE
PipelineSync -> #const PGRES_PIPELINE_SYNC
PipelineAbort -> #const PGRES_PIPELINE_ABORTED
data PipelineStatus
= PipelineOn
| PipelineOff
| PipelineAborted
deriving (Eq, Show)
decodePipelineStatus :: CInt -> Maybe PipelineStatus
decodePipelineStatus = \case
(#const PQ_PIPELINE_ON) -> Just PipelineOn
(#const PQ_PIPELINE_OFF) -> Just PipelineOff
(#const PQ_PIPELINE_ABORTED) -> Just PipelineAborted
_ -> Nothing
decodeBool :: CInt -> Maybe Bool
decodeBool = \case
0 -> Just False
1 -> Just True
_ -> Nothing

View File

@ -0,0 +1,7 @@
module Hasql.Pipeline
( Pipeline,
statement,
)
where
import Hasql.Pipeline.Core

View File

@ -0,0 +1,206 @@
module Hasql.Pipeline.Core where
import Hasql.Decoders.All qualified as Decoders
import Hasql.Decoders.Result qualified as Decoders.Result
import Hasql.Decoders.Results qualified as Decoders.Results
import Hasql.Encoders.All qualified as Encoders
import Hasql.Encoders.Params qualified as Encoders.Params
import Hasql.Errors
import Hasql.LibPq14 qualified as Pq
import Hasql.Prelude
import Hasql.PreparedStatementRegistry qualified as PreparedStatementRegistry
import Hasql.Statement qualified as Statement
run :: forall a. Pipeline a -> Pq.Connection -> PreparedStatementRegistry.PreparedStatementRegistry -> Bool -> IO (Either SessionError a)
run (Pipeline sendQueriesInIO) connection registry integerDatetimes = do
runExceptT do
enterPipelineMode
recvQueries <- sendQueries
pipelineSync
finallyE recvQueries do
recvPipelineSync
exitPipelineMode
where
enterPipelineMode :: ExceptT SessionError IO ()
enterPipelineMode =
runCommand $ Pq.enterPipelineMode connection
exitPipelineMode :: ExceptT SessionError IO ()
exitPipelineMode =
runCommand $ Pq.exitPipelineMode connection
sendQueries :: ExceptT SessionError IO (ExceptT SessionError IO a)
sendQueries =
fmap ExceptT $ ExceptT $ sendQueriesInIO connection registry integerDatetimes
pipelineSync :: ExceptT SessionError IO ()
pipelineSync =
runCommand $ Pq.pipelineSync connection
recvPipelineSync :: ExceptT SessionError IO ()
recvPipelineSync =
runResultsDecoder
$ Decoders.Results.single Decoders.Result.pipelineSync
runResultsDecoder :: forall a. Decoders.Results.Results a -> ExceptT SessionError IO a
runResultsDecoder decoder =
ExceptT
$ fmap (first PipelineError)
$ Decoders.Results.run decoder connection integerDatetimes
runCommand :: IO Bool -> ExceptT SessionError IO ()
runCommand action =
lift action >>= \case
True -> pure ()
False -> ExceptT (Left . PipelineError . ClientError <$> Pq.errorMessage connection)
-- |
-- Composable abstraction over the execution of queries in [the pipeline mode](https://www.postgresql.org/docs/current/libpq-pipeline-mode.html).
--
-- It allows you to issue multiple queries to the server in much fewer network transactions.
-- If the amounts of sent and received data do not surpass the buffer sizes in the driver and on the server it will be just a single roundtrip.
-- Typically the buffer size is 8KB.
--
-- This execution mode is much more efficient than running queries directly from 'Hasql.Session.Session', because in session every statement execution involves a dedicated network roundtrip.
-- An obvious question rises then: why not execute all queries like that?
--
-- In situations where the parameters depend on the result of another query it is impossible to execute them in parallel, because the client needs to receive the results of one query before sending the request to execute the next.
-- This reasoning is essentially the same as the one for the difference between 'Applicative' and 'Monad'.
-- That\'s why 'Pipeline' does not have the 'Monad' instance.
--
-- To execute 'Pipeline' lift it into 'Hasql.Session.Session' via 'Hasql.Session.pipeline'.
--
-- __Attention__: using this feature requires \"libpq\" of version >14.
--
-- == __Examples__
--
-- === Insert-Many or Batch-Insert
--
-- You can use pipeline to turn a single-row insert query into an efficient multi-row insertion session.
-- In effect this should be comparable in performance to issuing a single multi-row insert statement.
--
-- Given the following definition in a Statements module:
--
-- @
-- insertOrder :: 'Hasql.Statement.Statement' OrderDetails OrderId
-- @
--
-- You can lift it into the following session
--
-- @
-- insertOrders :: [OrderDetails] -> 'Hasql.Session.Session' [OrderId]
-- insertOrders songs =
-- 'Hasql.Session.pipeline' $
-- forM songs $ \song ->
-- 'Hasql.Pipeline.statement' song Statements.insertOrder
-- @
--
-- === Combining Queries
--
-- Given the following definitions in a Statements module:
--
-- @
-- selectOrderDetails :: 'Hasql.Statement.Statement' OrderId (Maybe OrderDetails)
-- selectOrderProducts :: 'Hasql.Statement.Statement' OrderId [OrderProduct]
-- selectOrderFinancialTransactions :: 'Hasql.Statement.Statement' OrderId [FinancialTransaction]
-- @
--
-- You can combine them into a session using the `ApplicativeDo` extension as follows:
--
-- @
-- selectEverythingAboutOrder :: OrderId -> 'Hasql.Session.Session' (Maybe OrderDetails, [OrderProduct], [FinancialTransaction])
-- selectEverythingAboutOrder orderId =
-- 'Hasql.Session.pipeline' $ do
-- details <- 'Hasql.Pipeline.statement' orderId Statements.selectOrderDetails
-- products <- 'Hasql.Pipeline.statement' orderId Statements.selectOrderProducts
-- transactions <- 'Hasql.Pipeline.statement' orderId Statements.selectOrderFinancialTransactions
-- pure (details, products, transactions)
-- @
newtype Pipeline a
= Pipeline
( Pq.Connection ->
PreparedStatementRegistry.PreparedStatementRegistry ->
Bool ->
IO (Either SessionError (IO (Either SessionError a)))
)
deriving (Functor)
instance Applicative Pipeline where
pure a =
Pipeline (\_ _ _ -> pure (Right (pure (Right a))))
Pipeline lSend <*> Pipeline rSend =
Pipeline \conn reg integerDatetimes ->
lSend conn reg integerDatetimes >>= \case
Left sendErr ->
pure (Left sendErr)
Right lRecv ->
rSend conn reg integerDatetimes <&> \case
Left sendErr ->
Left sendErr
Right rRecv ->
Right (liftA2 (<*>) lRecv rRecv)
-- |
-- Execute a statement in pipelining mode.
statement :: params -> Statement.Statement params result -> Pipeline result
statement params (Statement.Statement sql (Encoders.Params encoder) (Decoders.Result decoder) preparable) =
Pipeline run
where
run connection registry integerDatetimes =
if preparable
then runPrepared
else runUnprepared
where
runPrepared = runExceptT do
(key, keyRecv) <- ExceptT resolvePreparedStatementKey
queryRecv <- ExceptT (sendQuery key)
pure (keyRecv *> queryRecv)
where
(oidList, valueAndFormatList) =
Encoders.Params.compilePreparedStatementData encoder integerDatetimes params
resolvePreparedStatementKey =
PreparedStatementRegistry.update localKey onNewRemoteKey onOldRemoteKey registry
where
localKey =
PreparedStatementRegistry.LocalKey sql oidList
onNewRemoteKey key =
do
sent <- Pq.sendPrepare connection key sql (mfilter (not . null) (Just oidList))
if sent
then pure (True, Right (key, recv))
else (False,) . Left . commandToSessionError . ClientError <$> Pq.errorMessage connection
where
recv =
fmap (first commandToSessionError)
$ (<*)
<$> Decoders.Results.run (Decoders.Results.single Decoders.Result.noResult) connection integerDatetimes
<*> Decoders.Results.run Decoders.Results.dropRemainders connection integerDatetimes
onOldRemoteKey key =
pure (Right (key, pure (Right ())))
sendQuery key =
Pq.sendQueryPrepared connection key valueAndFormatList Pq.Binary >>= \case
False -> Left . commandToSessionError . ClientError <$> Pq.errorMessage connection
True -> pure (Right recv)
where
recv =
fmap (first commandToSessionError)
$ (<*)
<$> Decoders.Results.run decoder connection integerDatetimes
<*> Decoders.Results.run Decoders.Results.dropRemainders connection integerDatetimes
runUnprepared =
Pq.sendQueryParams connection sql (Encoders.Params.compileUnpreparedStatementData encoder integerDatetimes params) Pq.Binary >>= \case
False -> Left . commandToSessionError . ClientError <$> Pq.errorMessage connection
True -> pure (Right recv)
where
recv =
fmap (first commandToSessionError)
$ (<*)
<$> Decoders.Results.run decoder connection integerDatetimes
<*> Decoders.Results.run Decoders.Results.dropRemainders connection integerDatetimes
commandToSessionError =
QueryError sql (Encoders.Params.renderReadable encoder params)

View File

@ -1,6 +1,6 @@
module Hasql.PostgresTypeInfo where
import Database.PostgreSQL.LibPQ qualified as LibPQ
import Hasql.LibPq14 qualified as LibPQ
import Hasql.Prelude hiding (bool)
-- | A Postgresql type info

View File

@ -7,7 +7,6 @@ module Hasql.Prelude
forMToZero_,
forMFromZero_,
strictCons,
mapLeft,
)
where
@ -25,7 +24,7 @@ import Control.Monad.Reader.Class as Exports (MonadReader (..))
import Control.Monad.ST as Exports
import Control.Monad.Trans.Class as Exports
import Control.Monad.Trans.Cont as Exports hiding (callCC, shift)
import Control.Monad.Trans.Except as Exports (Except, ExceptT (ExceptT), catchE, except, mapExcept, mapExceptT, runExcept, runExceptT, throwE, withExcept, withExceptT)
import Control.Monad.Trans.Except as Exports (Except, ExceptT (ExceptT), catchE, except, finallyE, mapExcept, mapExceptT, runExcept, runExceptT, throwE, withExcept, withExceptT)
import Control.Monad.Trans.Maybe as Exports
import Control.Monad.Trans.Reader as Exports (Reader, ReaderT (ReaderT), mapReader, mapReaderT, runReader, runReaderT, withReader, withReaderT)
import Control.Monad.Trans.State.Strict as Exports (State, StateT (StateT), evalState, evalStateT, execState, execStateT, mapState, mapStateT, runState, runStateT, withState, withStateT)
@ -130,8 +129,3 @@ forMFromZero_ !endN f =
strictCons :: a -> [a] -> [a]
strictCons !a b =
let !c = a : b in c
{-# INLINE mapLeft #-}
mapLeft :: (a -> c) -> Either a b -> Either c b
mapLeft f =
either (Left . f) Right

View File

@ -8,6 +8,7 @@ where
import ByteString.StrictBuilder qualified as B
import Data.HashTable.IO qualified as A
import Hasql.LibPq14 qualified as Pq
import Hasql.Prelude hiding (lookup)
data PreparedStatementRegistry
@ -44,7 +45,7 @@ update localKey onNewRemoteKey onOldRemoteKey (PreparedStatementRegistry table c
-- |
-- Local statement key.
data LocalKey
= LocalKey !ByteString ![Word32]
= LocalKey !ByteString ![Pq.Oid]
deriving (Show, Eq)
instance Hashable LocalKey where

View File

@ -2,6 +2,7 @@ module Hasql.Session
( Session,
sql,
statement,
pipeline,
-- * Execution
run,

View File

@ -1,24 +1,26 @@
module Hasql.Session.Core where
import Hasql.Connection.Core qualified as Connection
import Hasql.Decoders.All qualified as Decoders
import Hasql.Decoders.Result qualified as Decoders.Result
import Hasql.Decoders.Results qualified as Decoders.Results
import Hasql.Encoders.All qualified as Encoders
import Hasql.Encoders.Params qualified as Encoders.Params
import Hasql.Errors
import Hasql.IO qualified as IO
import Hasql.Pipeline.Core qualified as Pipeline
import Hasql.Prelude
import Hasql.Statement qualified as Statement
-- |
-- A batch of actions to be executed in the context of a database connection.
newtype Session a
= Session (ReaderT Connection.Connection (ExceptT QueryError IO) a)
deriving (Functor, Applicative, Monad, MonadError QueryError, MonadIO, MonadReader Connection.Connection)
= Session (ReaderT Connection.Connection (ExceptT SessionError IO) a)
deriving (Functor, Applicative, Monad, MonadError SessionError, MonadIO, MonadReader Connection.Connection)
-- |
-- Executes a bunch of commands on the provided connection.
run :: Session a -> Connection.Connection -> IO (Either QueryError a)
run :: Session a -> Connection.Connection -> IO (Either SessionError a)
run (Session impl) connection =
runExceptT
$ runReaderT impl connection
@ -33,7 +35,7 @@ sql sql =
$ ReaderT
$ \(Connection.Connection pqConnectionRef integerDatetimes registry) ->
ExceptT
$ fmap (mapLeft (QueryError sql []))
$ fmap (first (QueryError sql []))
$ withMVar pqConnectionRef
$ \pqConnection -> do
r1 <- IO.sendNonparametricStatement pqConnection sql
@ -44,20 +46,24 @@ sql sql =
Decoders.Results.single Decoders.Result.noResult
-- |
-- Parameters and a specification of a parametric single-statement query to apply them to.
-- Execute a statement by providing parameters to it.
statement :: params -> Statement.Statement params result -> Session result
statement input (Statement.Statement template (Encoders.Params paramsEncoder@(Encoders.Params.Params _ _ _ printer)) decoder preparable) =
statement input (Statement.Statement template (Encoders.Params paramsEncoder) (Decoders.Result decoder) preparable) =
Session
$ ReaderT
$ \(Connection.Connection pqConnectionRef integerDatetimes registry) ->
ExceptT
$ fmap (mapLeft (QueryError template inputReps))
$ fmap (first (QueryError template (Encoders.Params.renderReadable paramsEncoder input)))
$ withMVar pqConnectionRef
$ \pqConnection -> do
r1 <- IO.sendParametricStatement pqConnection integerDatetimes registry template paramsEncoder preparable input
r2 <- IO.getResults pqConnection integerDatetimes (unsafeCoerce decoder)
r2 <- IO.getResults pqConnection integerDatetimes decoder
return $ r1 *> r2
where
inputReps =
printer input
& toList
-- |
-- Execute a pipeline.
pipeline :: Pipeline.Pipeline result -> Session result
pipeline pipeline =
Session $ ReaderT \(Connection.Connection pqConnectionRef integerDatetimes registry) ->
ExceptT $ withMVar pqConnectionRef \pqConnection ->
Pipeline.run pipeline pqConnection registry integerDatetimes

View File

@ -5,7 +5,7 @@ import Hasql.Decoders qualified as Decoders
import Hasql.Encoders qualified as Encoders
import Hasql.Session qualified as Session
import Hasql.Statement qualified as Statement
import Hasql.TestingUtils.TestingDsl qualified as Session
import Hasql.TestingKit.TestingDsl qualified as Session
import Main.Connection qualified as Connection
import Main.Prelude hiding (assert)
import Main.Statements qualified as Statements

View File

@ -1,4 +1,4 @@
module Hasql.TestingUtils.Constants where
module Hasql.TestingKit.Constants where
import Hasql.Connection qualified as Connection

View File

@ -0,0 +1,44 @@
module Hasql.TestingKit.Statements.BrokenSyntax where
import Hasql.Decoders qualified as Decoders
import Hasql.Encoders qualified as Encoders
import Hasql.Pipeline qualified as Pipeline
import Hasql.Session qualified as Session
import Hasql.Statement qualified as Statement
import Prelude
data Params = Params
{ start :: Int64,
end :: Int64
}
type Result = [Int64]
session :: Bool -> Params -> Session.Session Result
session prepared params =
Session.statement params (statement prepared)
pipeline :: Bool -> Params -> Pipeline.Pipeline Result
pipeline prepared params =
Pipeline.statement params (statement prepared)
statement :: Bool -> Statement.Statement Params Result
statement =
Statement.Statement sql encoder decoder
sql :: ByteString
sql =
"S"
encoder :: Encoders.Params Params
encoder =
mconcat
[ start >$< Encoders.param (Encoders.nonNullable Encoders.int8),
end >$< Encoders.param (Encoders.nonNullable Encoders.int8)
]
decoder :: Decoders.Result Result
decoder =
Decoders.rowList
( Decoders.column (Decoders.nonNullable Decoders.int8)
)

View File

@ -0,0 +1,44 @@
module Hasql.TestingKit.Statements.GenerateSeries where
import Hasql.Decoders qualified as Decoders
import Hasql.Encoders qualified as Encoders
import Hasql.Pipeline qualified as Pipeline
import Hasql.Session qualified as Session
import Hasql.Statement qualified as Statement
import Prelude
data Params = Params
{ start :: Int64,
end :: Int64
}
type Result = [Int64]
session :: Bool -> Params -> Session.Session Result
session prepared params =
Session.statement params (statement prepared)
pipeline :: Bool -> Params -> Pipeline.Pipeline Result
pipeline prepared params =
Pipeline.statement params (statement prepared)
statement :: Bool -> Statement.Statement Params Result
statement =
Statement.Statement sql encoder decoder
sql :: ByteString
sql =
"SELECT generate_series($1, $2)"
encoder :: Encoders.Params Params
encoder =
mconcat
[ start >$< Encoders.param (Encoders.nonNullable Encoders.int8),
end >$< Encoders.param (Encoders.nonNullable Encoders.int8)
]
decoder :: Decoders.Result Result
decoder =
Decoders.rowList
( Decoders.column (Decoders.nonNullable Decoders.int8)
)

View File

@ -0,0 +1,44 @@
module Hasql.TestingKit.Statements.WrongDecoder where
import Hasql.Decoders qualified as Decoders
import Hasql.Encoders qualified as Encoders
import Hasql.Pipeline qualified as Pipeline
import Hasql.Session qualified as Session
import Hasql.Statement qualified as Statement
import Prelude
data Params = Params
{ start :: Int64,
end :: Int64
}
type Result = [UUID]
session :: Bool -> Params -> Session.Session Result
session prepared params =
Session.statement params (statement prepared)
pipeline :: Bool -> Params -> Pipeline.Pipeline Result
pipeline prepared params =
Pipeline.statement params (statement prepared)
statement :: Bool -> Statement.Statement Params Result
statement =
Statement.Statement sql encoder decoder
sql :: ByteString
sql =
"SELECT generate_series($1, $2)"
encoder :: Encoders.Params Params
encoder =
mconcat
[ start >$< Encoders.param (Encoders.nonNullable Encoders.int8),
end >$< Encoders.param (Encoders.nonNullable Encoders.int8)
]
decoder :: Decoders.Result Result
decoder =
Decoders.rowList
( Decoders.column (Decoders.nonNullable Decoders.uuid)
)

View File

@ -0,0 +1,57 @@
module Hasql.TestingKit.TestingDsl
( -- * Errors
Error (..),
Session.SessionError (..),
Session.CommandError (..),
Session.ResultError (..),
Session.RowError (..),
-- * Abstractions
Session.Session,
Pipeline.Pipeline,
Statement.Statement (..),
-- * Execution
runSessionOnLocalDb,
runPipelineOnLocalDb,
runStatementInSession,
runPipelineInSession,
)
where
import Hasql.Connection qualified as Connection
import Hasql.Pipeline qualified as Pipeline
import Hasql.Session qualified as Session
import Hasql.Statement qualified as Statement
import Hasql.TestingKit.Constants qualified as Constants
import Prelude
data Error
= ConnectionError (Connection.ConnectionError)
| SessionError (Session.SessionError)
deriving (Show, Eq)
runSessionOnLocalDb :: Session.Session a -> IO (Either Error a)
runSessionOnLocalDb session =
runExceptT $ acquire >>= \connection -> use connection <* release connection
where
acquire =
ExceptT $ fmap (first ConnectionError) $ Connection.acquire Constants.localConnectionSettings
use connection =
ExceptT
$ fmap (first SessionError)
$ Session.run session connection
release connection =
lift $ Connection.release connection
runPipelineOnLocalDb :: Pipeline.Pipeline a -> IO (Either Error a)
runPipelineOnLocalDb =
runSessionOnLocalDb . Session.pipeline
runStatementInSession :: Statement.Statement a b -> a -> Session.Session b
runStatementInSession statement params =
Session.statement params statement
runPipelineInSession :: Pipeline.Pipeline a -> Session.Session a
runPipelineInSession =
Session.pipeline

View File

@ -1,37 +0,0 @@
module Hasql.TestingUtils.TestingDsl
( Session.Session,
SessionError (..),
Session.QueryError (..),
Session.CommandError (..),
runSessionOnLocalDb,
runStatementInSession,
)
where
import Hasql.Connection qualified as Connection
import Hasql.Session qualified as Session
import Hasql.Statement qualified as Statement
import Hasql.TestingUtils.Constants qualified as Constants
import Prelude
data SessionError
= ConnectionError (Connection.ConnectionError)
| SessionError (Session.QueryError)
deriving (Show, Eq)
runSessionOnLocalDb :: Session.Session a -> IO (Either SessionError a)
runSessionOnLocalDb session =
runExceptT $ acquire >>= \connection -> use connection <* release connection
where
acquire =
ExceptT $ fmap (mapLeft ConnectionError) $ Connection.acquire Constants.localConnectionSettings
use connection =
ExceptT
$ fmap (mapLeft SessionError)
$ Session.run session connection
release connection =
lift $ Connection.release connection
runStatementInSession :: Statement.Statement a b -> a -> Session.Session b
runStatementInSession statement params =
Session.statement params statement