This commit is contained in:
Nikita Volkov 2024-04-21 09:02:28 +03:00
parent b07dc536fe
commit 3c36ebf3d7
3 changed files with 64 additions and 38 deletions

View File

@ -9,14 +9,14 @@ spec :: Spec
spec = do
describe "Single-statement" do
describe "Unprepared" do
it "Collects results and sends params" do
fit "Collects results and sends params" do
result <-
Dsl.runPipelineOnLocalDb
$ GenerateSeries.pipeline False GenerateSeries.Params {start = 0, end = 2}
shouldBe result (Right [0 .. 2])
describe "Prepared and sends params" do
fit "Collects results and sends params" do
describe "Prepared" do
it "Collects results and sends params" do
result <-
Dsl.runPipelineOnLocalDb
$ GenerateSeries.pipeline True GenerateSeries.Params {start = 0, end = 2}

View File

@ -25,7 +25,6 @@ noResult =
checkExecStatus $ \case
LibPQ.CommandOk -> True
LibPQ.TuplesOk -> True
LibPQ.PipelineSync -> True
_ -> False
{-# INLINE rowsAffected #-}

View File

@ -5,6 +5,7 @@ module Hasql.Pipeline.Core where
import Database.PostgreSQL.LibPQ qualified as Pq
import Hasql.Connection.Core qualified as Connection
import Hasql.Decoders.All qualified as Decoders
import Hasql.Decoders.Result qualified as Decoders.Result
import Hasql.Decoders.Results qualified as Decoders.Results
import Hasql.Encoders.All qualified as Encoders
import Hasql.Encoders.Params qualified as Encoders.Params
@ -31,15 +32,12 @@ run (Pipeline send) (Connection.Connection pqConnectionRef integerDatetimes regi
runCommandFailing pqConnection $ Pq.pipelineSync pqConnection
putStrLn "recv"
recvResult <- recv
case recvResult of
Left err -> pure (Left err)
Right res -> do
putStrLn "dropRemainders"
handleEither =<< Decoders.Results.run Decoders.Results.dropRemainders pqConnection integerDatetimes
putStrLn "exitPipelineMode"
runCommandFailing pqConnection $ Pq.exitPipelineMode pqConnection
putStrLn "return"
pure (sendResult *> recvResult)
putStrLn "dropRemainders"
handleEither =<< Decoders.Results.run Decoders.Results.dropRemainders pqConnection integerDatetimes
putStrLn "exitPipelineMode"
runCommandFailing pqConnection $ Pq.exitPipelineMode pqConnection
putStrLn "return"
pure recvResult
where
runCommandFailing :: Pq.Connection -> IO Bool -> IO ()
runCommandFailing pqConn runCmd =
@ -77,34 +75,63 @@ statement :: params -> Statement.Statement params result -> Pipeline result
statement params (Statement.Statement sql (Encoders.Params encoder) (Decoders.Result decoder) preparable) =
Pipeline run
where
run connection registry integerDatetimes = do
error "TODO"
run connection registry integerDatetimes =
if preparable
then runPrepared
else runUnprepared
where
(oidList, valueAndFormatList) =
Encoders.Params.compilePreparedStatementData encoder integerDatetimes params
resolvePreparedStatementKey :: IO (Either QueryError (ByteString, IO (Either QueryError ())))
resolvePreparedStatementKey =
PreparedStatementRegistry.update localKey onNewRemoteKey onOldRemoteKey registry
runPrepared = runExceptT do
(key, keyRecv) <- ExceptT resolvePreparedStatementKey
queryRecv <- ExceptT (sendQuery key)
pure (keyRecv *> queryRecv)
where
localKey =
PreparedStatementRegistry.LocalKey sql oidList
onNewRemoteKey key =
do
sent <- Pq.sendPrepare connection key sql (mfilter (not . null) (Just oidList))
if sent
then pure (True, Right (key, recv))
else (False,) . Left . commandToQueryError . ClientError <$> Pq.errorMessage connection
(oidList, valueAndFormatList) =
Encoders.Params.compilePreparedStatementData encoder integerDatetimes params
resolvePreparedStatementKey =
PreparedStatementRegistry.update localKey onNewRemoteKey onOldRemoteKey registry
where
recv :: IO (Either QueryError ())
recv = do
Pq.getResult connection >>= \case
Nothing ->
Left . commandToQueryError . ClientError <$> Pq.errorMessage connection
Just result ->
error "TODO"
onOldRemoteKey key =
pure (Right (key, pure (Right ())))
localKey =
PreparedStatementRegistry.LocalKey sql oidList
onNewRemoteKey key =
do
sent <- Pq.sendPrepare connection key sql (mfilter (not . null) (Just oidList))
if sent
then pure (True, Right (key, recv))
else (False,) . Left . commandToQueryError . ClientError <$> Pq.errorMessage connection
where
recv :: IO (Either QueryError ())
recv = do
Pq.getResult connection >>= \case
Nothing ->
Left . commandToQueryError . ClientError <$> Pq.errorMessage connection
Just result ->
mapLeft (commandToQueryError . ResultError)
<$> Decoders.Result.run Decoders.Result.noResult integerDatetimes result
onOldRemoteKey key =
pure (Right (key, pure (Right ())))
sendQuery key =
Pq.sendQueryPrepared connection key valueAndFormatList Pq.Binary >>= \case
False -> Left . commandToQueryError . ClientError <$> Pq.errorMessage connection
True -> pure (Right recv)
where
recv =
fmap (mapLeft commandToQueryError)
$ (<*)
<$> Decoders.Results.run decoder connection integerDatetimes
<*> Decoders.Results.run Decoders.Results.dropRemainders connection integerDatetimes
runUnprepared =
Pq.sendQueryParams connection sql (Encoders.Params.compileUnpreparedStatementData encoder integerDatetimes params) Pq.Binary >>= \case
False -> Left . commandToQueryError . ClientError <$> Pq.errorMessage connection
True -> pure (Right recv)
where
recv =
fmap (mapLeft commandToQueryError)
$ (<*)
<$> Decoders.Results.run decoder connection integerDatetimes
<*> Decoders.Results.run Decoders.Results.dropRemainders connection integerDatetimes
commandToQueryError =
QueryError sql (Encoders.Params.renderReadable encoder params)