hasql/library/Hasql/Pipeline/Core.hs
2024-05-08 09:26:14 +03:00

205 lines
8.5 KiB
Haskell

module Hasql.Pipeline.Core where
import Hasql.Decoders.All qualified as Decoders
import Hasql.Decoders.Result qualified as Decoders.Result
import Hasql.Decoders.Results qualified as Decoders.Results
import Hasql.Encoders.All qualified as Encoders
import Hasql.Encoders.Params qualified as Encoders.Params
import Hasql.Errors
import Hasql.LibPq14 qualified as Pq
import Hasql.Prelude
import Hasql.PreparedStatementRegistry qualified as PreparedStatementRegistry
import Hasql.Statement qualified as Statement
run :: forall a. Pipeline a -> Pq.Connection -> PreparedStatementRegistry.PreparedStatementRegistry -> Bool -> IO (Either SessionError a)
run (Pipeline sendQueriesInIO) connection registry integerDatetimes = do
runExceptT do
enterPipelineMode
recvQueries <- sendQueries
pipelineSync
finallyE recvQueries do
recvPipelineSync
exitPipelineMode
where
enterPipelineMode :: ExceptT SessionError IO ()
enterPipelineMode =
runCommand $ Pq.enterPipelineMode connection
exitPipelineMode :: ExceptT SessionError IO ()
exitPipelineMode =
runCommand $ Pq.exitPipelineMode connection
sendQueries :: ExceptT SessionError IO (ExceptT SessionError IO a)
sendQueries =
fmap ExceptT $ ExceptT $ sendQueriesInIO connection registry integerDatetimes
pipelineSync :: ExceptT SessionError IO ()
pipelineSync =
runCommand $ Pq.pipelineSync connection
recvPipelineSync :: ExceptT SessionError IO ()
recvPipelineSync =
runResultsDecoder
$ Decoders.Results.single Decoders.Result.pipelineSync
runResultsDecoder :: forall a. Decoders.Results.Results a -> ExceptT SessionError IO a
runResultsDecoder decoder =
ExceptT
$ fmap (first PipelineError)
$ Decoders.Results.run decoder connection integerDatetimes
runCommand :: IO Bool -> ExceptT SessionError IO ()
runCommand action =
lift action >>= \case
True -> pure ()
False -> ExceptT (Left . PipelineError . ClientError <$> Pq.errorMessage connection)
-- |
-- Composable abstraction over the execution of queries in [the pipeline mode](https://www.postgresql.org/docs/current/libpq-pipeline-mode.html).
--
-- It allows you to issue multiple queries to the server in much fewer network transactions.
-- If the amounts of sent and received data do not surpass the buffer sizes in the driver and on the server it will be just a single roundtrip.
-- Typically the buffer size is 8KB.
--
-- This execution mode is much more efficient than running queries directly from 'Hasql.Session.Session', because in session every statement execution involves a dedicated network roundtrip.
-- An obvious question rises then: why not execute all queries like that?
--
-- In situations where the parameters depend on the result of another query it is impossible to execute them in parallel, because the client needs to receive the results of one query before sending the request to execute the next.
-- This reasoning is essentially the same as the one for the difference between 'Applicative' and 'Monad'.
-- That\'s why 'Pipeline' does not have the 'Monad' instance.
--
-- To execute 'Pipeline' lift it into 'Hasql.Session.Session' via 'Hasql.Session.pipeline'.
--
-- == __Examples__
--
-- === Insert-Many or Batch-Insert
--
-- You can use pipeline to turn a single-row insert query into an efficient multi-row insertion session.
-- In effect this should be comparable in performance to issuing a single multi-row insert statement.
--
-- Given the following definition in a Statements module:
--
-- @
-- insertOrder :: 'Hasql.Statement.Statement' OrderDetails OrderId
-- @
--
-- You can lift it into the following session
--
-- @
-- insertOrders :: [OrderDetails] -> 'Hasql.Session.Session' [OrderId]
-- insertOrders orders =
-- 'Hasql.Session.pipeline' $
-- forM orders $ \\order ->
-- 'Hasql.Pipeline.statement' order Statements.insertOrder
-- @
--
-- === Combining Queries
--
-- Given the following definitions in a Statements module:
--
-- @
-- selectOrderDetails :: 'Hasql.Statement.Statement' OrderId (Maybe OrderDetails)
-- selectOrderProducts :: 'Hasql.Statement.Statement' OrderId [OrderProduct]
-- selectOrderFinancialTransactions :: 'Hasql.Statement.Statement' OrderId [FinancialTransaction]
-- @
--
-- You can combine them into a session using the `ApplicativeDo` extension as follows:
--
-- @
-- selectEverythingAboutOrder :: OrderId -> 'Hasql.Session.Session' (Maybe OrderDetails, [OrderProduct], [FinancialTransaction])
-- selectEverythingAboutOrder orderId =
-- 'Hasql.Session.pipeline' $ do
-- details <- 'Hasql.Pipeline.statement' orderId Statements.selectOrderDetails
-- products <- 'Hasql.Pipeline.statement' orderId Statements.selectOrderProducts
-- transactions <- 'Hasql.Pipeline.statement' orderId Statements.selectOrderFinancialTransactions
-- pure (details, products, transactions)
-- @
newtype Pipeline a
= Pipeline
( Pq.Connection ->
PreparedStatementRegistry.PreparedStatementRegistry ->
Bool ->
IO (Either SessionError (IO (Either SessionError a)))
)
deriving (Functor)
instance Applicative Pipeline where
pure a =
Pipeline (\_ _ _ -> pure (Right (pure (Right a))))
Pipeline lSend <*> Pipeline rSend =
Pipeline \conn reg integerDatetimes ->
lSend conn reg integerDatetimes >>= \case
Left sendErr ->
pure (Left sendErr)
Right lRecv ->
rSend conn reg integerDatetimes <&> \case
Left sendErr ->
Left sendErr
Right rRecv ->
Right (liftA2 (<*>) lRecv rRecv)
-- |
-- Execute a statement in pipelining mode.
statement :: params -> Statement.Statement params result -> Pipeline result
statement params (Statement.Statement sql (Encoders.Params encoder) (Decoders.Result decoder) preparable) =
Pipeline run
where
run connection registry integerDatetimes =
if preparable
then runPrepared
else runUnprepared
where
runPrepared = runExceptT do
(key, keyRecv) <- ExceptT resolvePreparedStatementKey
queryRecv <- ExceptT (sendQuery key)
pure (keyRecv *> queryRecv)
where
(oidList, valueAndFormatList) =
Encoders.Params.compilePreparedStatementData encoder integerDatetimes params
resolvePreparedStatementKey =
PreparedStatementRegistry.update localKey onNewRemoteKey onOldRemoteKey registry
where
localKey =
PreparedStatementRegistry.LocalKey sql oidList
onNewRemoteKey key =
do
sent <- Pq.sendPrepare connection key sql (mfilter (not . null) (Just oidList))
if sent
then pure (True, Right (key, recv))
else (False,) . Left . commandToSessionError . ClientError <$> Pq.errorMessage connection
where
recv =
fmap (first commandToSessionError)
$ (<*)
<$> Decoders.Results.run (Decoders.Results.single Decoders.Result.noResult) connection integerDatetimes
<*> Decoders.Results.run Decoders.Results.dropRemainders connection integerDatetimes
onOldRemoteKey key =
pure (Right (key, pure (Right ())))
sendQuery key =
Pq.sendQueryPrepared connection key valueAndFormatList Pq.Binary >>= \case
False -> Left . commandToSessionError . ClientError <$> Pq.errorMessage connection
True -> pure (Right recv)
where
recv =
fmap (first commandToSessionError)
$ (<*)
<$> Decoders.Results.run decoder connection integerDatetimes
<*> Decoders.Results.run Decoders.Results.dropRemainders connection integerDatetimes
runUnprepared =
Pq.sendQueryParams connection sql (Encoders.Params.compileUnpreparedStatementData encoder integerDatetimes params) Pq.Binary >>= \case
False -> Left . commandToSessionError . ClientError <$> Pq.errorMessage connection
True -> pure (Right recv)
where
recv =
fmap (first commandToSessionError)
$ (<*)
<$> Decoders.Results.run decoder connection integerDatetimes
<*> Decoders.Results.run Decoders.Results.dropRemainders connection integerDatetimes
commandToSessionError =
QueryError sql (Encoders.Params.renderReadable encoder params)