This commit is contained in:
Nikita Volkov 2024-04-20 18:35:32 +03:00
parent b587235bc6
commit f96af01f08
5 changed files with 65 additions and 26 deletions

View File

@ -7,18 +7,38 @@ import Prelude
spec :: Spec
spec = do
describe "Single-statement" do
describe "Unprepared" do
it "Collects results and sends params" do
result <-
Dsl.runPipelineOnLocalDb
$ GenerateSeries.pipeline False GenerateSeries.Params {start = 0, end = 2}
shouldBe result (Right [0 .. 2])
describe "Prepared and sends params" do
fit "Collects results and sends params" do
result <-
Dsl.runPipelineOnLocalDb
$ GenerateSeries.pipeline True GenerateSeries.Params {start = 0, end = 2}
shouldBe result (Right [0 .. 2])
describe "Normally" do
describe "On prepared statements" do
it "Collects results" do
_result <-
it "Collects results and sends params" do
result <-
Dsl.runPipelineOnLocalDb
$ (,)
<$> GenerateSeries.pipeline True GenerateSeries.Params {start = 0, end = 1000}
<*> GenerateSeries.pipeline True GenerateSeries.Params {start = 0, end = 1000}
pending
$ replicateM 2
$ GenerateSeries.pipeline True GenerateSeries.Params {start = 0, end = 2}
shouldBe result (Right [[0 .. 2], [0 .. 2]])
describe "On unprepared statements" do
it "Works" do
pending
it "Collects results and sends params" do
result <-
Dsl.runPipelineOnLocalDb
$ replicateM 2
$ GenerateSeries.pipeline False GenerateSeries.Params {start = 0, end = 2}
shouldBe result (Right [[0 .. 2], [0 .. 2]])
describe "When some part fails" do
it "Works" do
pending

View File

@ -21,9 +21,9 @@ newtype Results a
deriving (Functor, Applicative, Monad)
{-# INLINE run #-}
run :: Results a -> (Bool, LibPQ.Connection) -> IO (Either CommandError a)
run (Results stack) env =
runExceptT (runReaderT stack env)
run :: Results a -> LibPQ.Connection -> Bool -> IO (Either CommandError a)
run (Results stack) conn idt =
runExceptT (runReaderT stack (idt, conn))
{-# INLINE clientError #-}
clientError :: Results a
@ -87,8 +87,8 @@ dropRemainders =
ExceptT $ fmap (mapLeft ResultError) $ Result.run Result.noResult (integerDatetimes, result)
refine :: (a -> Either Text b) -> Results a -> Results b
refine refiner results = Results
refine refiner (Results stack) = Results
$ ReaderT
$ \env -> ExceptT $ do
resultEither <- run results env
resultEither <- runExceptT $ runReaderT stack env
return $ resultEither >>= mapLeft (ResultError . UnexpectedResult) . refiner

View File

@ -62,9 +62,9 @@ getResults connection integerDatetimes decoder =
(<*) <$> get <*> dropRemainders
where
get =
ResultsDecoders.run decoder (integerDatetimes, connection)
ResultsDecoders.run decoder connection integerDatetimes
dropRemainders =
ResultsDecoders.run ResultsDecoders.dropRemainders (integerDatetimes, connection)
ResultsDecoders.run ResultsDecoders.dropRemainders connection integerDatetimes
{-# INLINE getPreparedStatementKey #-}
getPreparedStatementKey ::
@ -85,12 +85,12 @@ getPreparedStatementKey connection registry template oidList =
onNewRemoteKey key =
do
sent <- LibPQ.sendPrepare connection key template (mfilter (not . null) (Just oidList))
let resultsDecoder =
fmap resultsMapping $ getResults connection undefined (resultsDecoder sent)
where
resultsDecoder sent =
if sent
then ResultsDecoders.single ResultDecoders.noResult
else ResultsDecoders.clientError
fmap resultsMapping $ getResults connection undefined resultsDecoder
where
resultsMapping =
\case
Left x -> (False, Left x)

View File

@ -3,6 +3,7 @@ module Hasql.Pipeline.Core where
import Database.PostgreSQL.LibPQ qualified as Pq
import Hasql.Connection.Core qualified as Connection
import Hasql.Decoders.All qualified as Decoders
import Hasql.Decoders.Results qualified as Decoders.Results
import Hasql.Encoders.All qualified as Encoders
import Hasql.Encoders.Params qualified as Encoders.Params
import Hasql.Errors
@ -10,16 +11,33 @@ import Hasql.IO qualified as IO
import Hasql.Prelude
import Hasql.PreparedStatementRegistry qualified as PreparedStatementRegistry
import Hasql.Statement qualified as Statement
import System.IO (BufferMode (NoBuffering), hSetBuffering, stdout)
run :: Pipeline a -> Connection.Connection -> IO (Either QueryError a)
run (Pipeline send recv) (Connection.Connection pqConnectionRef integerDatetimes registry) =
run (Pipeline send recv) (Connection.Connection pqConnectionRef integerDatetimes registry) = do
hSetBuffering stdout NoBuffering
withMVar pqConnectionRef \pqConnection -> do
Pq.enterPipelineMode pqConnection
putStrLn "enterPipelineMode"
runCommandFailing pqConnection $ Pq.enterPipelineMode pqConnection
putStrLn "send"
sendResult <- send pqConnection integerDatetimes registry
Pq.pipelineSync pqConnection
putStrLn "pipelineSync"
runCommandFailing pqConnection $ Pq.pipelineSync pqConnection
putStrLn "recv"
recvResult <- recv pqConnection integerDatetimes
Pq.exitPipelineMode pqConnection
putStrLn "exitPipelineMode"
handleEither =<< Decoders.Results.run Decoders.Results.dropRemainders pqConnection integerDatetimes
putStrLn "exitPipelineMode"
runCommandFailing pqConnection $ Pq.exitPipelineMode pqConnection
putStrLn "return"
pure (sendResult *> recvResult)
where
runCommandFailing :: Pq.Connection -> IO Bool -> IO ()
runCommandFailing pqConn runCmd =
IO.checkedSend pqConn runCmd >>= handleEither
handleEither = \case
Right a -> pure a
Left err -> fail $ show err
data Pipeline a
= Pipeline
@ -60,6 +78,7 @@ statement params (Statement.Statement template (Encoders.Params paramsEncoder) (
recv pqConnection integerDatetimes =
mapLeft commandToQueryError
-- <$> Decoders.Results.run decoder (integerDatetimes, pqConnection)
<$> IO.getResults pqConnection integerDatetimes decoder
commandToQueryError =

View File

@ -12,7 +12,7 @@ data Params = Params
end :: Int64
}
type Result = Vector Int64
type Result = [Int64]
session :: Bool -> Params -> Session.Session Result
session prepared params =
@ -39,6 +39,6 @@ encoder =
decoder :: Decoders.Result Result
decoder =
Decoders.rowVector
Decoders.rowList
( Decoders.column (Decoders.nonNullable Decoders.int8)
)