From f96af01f082697f6549e774da5dfb4a2160d27d4 Mon Sep 17 00:00:00 2001 From: Nikita Volkov Date: Sat, 20 Apr 2024 18:35:32 +0300 Subject: [PATCH] Progress --- hspec/Hasql/PipelineSpec.hs | 36 ++++++++++++++----- library/Hasql/Decoders/Results.hs | 10 +++--- library/Hasql/IO.hs | 14 ++++---- library/Hasql/Pipeline/Core.hs | 27 +++++++++++--- .../TestingUtils/Statements/GenerateSeries.hs | 4 +-- 5 files changed, 65 insertions(+), 26 deletions(-) diff --git a/hspec/Hasql/PipelineSpec.hs b/hspec/Hasql/PipelineSpec.hs index 214ef7f..291e3d3 100644 --- a/hspec/Hasql/PipelineSpec.hs +++ b/hspec/Hasql/PipelineSpec.hs @@ -7,18 +7,38 @@ import Prelude spec :: Spec spec = do + describe "Single-statement" do + describe "Unprepared" do + it "Collects results and sends params" do + result <- + Dsl.runPipelineOnLocalDb + $ GenerateSeries.pipeline False GenerateSeries.Params {start = 0, end = 2} + shouldBe result (Right [0 .. 2]) + + describe "Prepared and sends params" do + fit "Collects results and sends params" do + result <- + Dsl.runPipelineOnLocalDb + $ GenerateSeries.pipeline True GenerateSeries.Params {start = 0, end = 2} + shouldBe result (Right [0 .. 2]) + describe "Normally" do describe "On prepared statements" do - it "Collects results" do - _result <- + it "Collects results and sends params" do + result <- Dsl.runPipelineOnLocalDb - $ (,) - <$> GenerateSeries.pipeline True GenerateSeries.Params {start = 0, end = 1000} - <*> GenerateSeries.pipeline True GenerateSeries.Params {start = 0, end = 1000} - pending + $ replicateM 2 + $ GenerateSeries.pipeline True GenerateSeries.Params {start = 0, end = 2} + shouldBe result (Right [[0 .. 2], [0 .. 2]]) + describe "On unprepared statements" do - it "Works" do - pending + it "Collects results and sends params" do + result <- + Dsl.runPipelineOnLocalDb + $ replicateM 2 + $ GenerateSeries.pipeline False GenerateSeries.Params {start = 0, end = 2} + shouldBe result (Right [[0 .. 2], [0 .. 2]]) + describe "When some part fails" do it "Works" do pending diff --git a/library/Hasql/Decoders/Results.hs b/library/Hasql/Decoders/Results.hs index 26f218a..040ec38 100644 --- a/library/Hasql/Decoders/Results.hs +++ b/library/Hasql/Decoders/Results.hs @@ -21,9 +21,9 @@ newtype Results a deriving (Functor, Applicative, Monad) {-# INLINE run #-} -run :: Results a -> (Bool, LibPQ.Connection) -> IO (Either CommandError a) -run (Results stack) env = - runExceptT (runReaderT stack env) +run :: Results a -> LibPQ.Connection -> Bool -> IO (Either CommandError a) +run (Results stack) conn idt = + runExceptT (runReaderT stack (idt, conn)) {-# INLINE clientError #-} clientError :: Results a @@ -87,8 +87,8 @@ dropRemainders = ExceptT $ fmap (mapLeft ResultError) $ Result.run Result.noResult (integerDatetimes, result) refine :: (a -> Either Text b) -> Results a -> Results b -refine refiner results = Results +refine refiner (Results stack) = Results $ ReaderT $ \env -> ExceptT $ do - resultEither <- run results env + resultEither <- runExceptT $ runReaderT stack env return $ resultEither >>= mapLeft (ResultError . UnexpectedResult) . refiner diff --git a/library/Hasql/IO.hs b/library/Hasql/IO.hs index b434b89..c47840b 100644 --- a/library/Hasql/IO.hs +++ b/library/Hasql/IO.hs @@ -62,9 +62,9 @@ getResults connection integerDatetimes decoder = (<*) <$> get <*> dropRemainders where get = - ResultsDecoders.run decoder (integerDatetimes, connection) + ResultsDecoders.run decoder connection integerDatetimes dropRemainders = - ResultsDecoders.run ResultsDecoders.dropRemainders (integerDatetimes, connection) + ResultsDecoders.run ResultsDecoders.dropRemainders connection integerDatetimes {-# INLINE getPreparedStatementKey #-} getPreparedStatementKey :: @@ -85,12 +85,12 @@ getPreparedStatementKey connection registry template oidList = onNewRemoteKey key = do sent <- LibPQ.sendPrepare connection key template (mfilter (not . null) (Just oidList)) - let resultsDecoder = - if sent - then ResultsDecoders.single ResultDecoders.noResult - else ResultsDecoders.clientError - fmap resultsMapping $ getResults connection undefined resultsDecoder + fmap resultsMapping $ getResults connection undefined (resultsDecoder sent) where + resultsDecoder sent = + if sent + then ResultsDecoders.single ResultDecoders.noResult + else ResultsDecoders.clientError resultsMapping = \case Left x -> (False, Left x) diff --git a/library/Hasql/Pipeline/Core.hs b/library/Hasql/Pipeline/Core.hs index 440fd20..5d2debe 100644 --- a/library/Hasql/Pipeline/Core.hs +++ b/library/Hasql/Pipeline/Core.hs @@ -3,6 +3,7 @@ module Hasql.Pipeline.Core where import Database.PostgreSQL.LibPQ qualified as Pq import Hasql.Connection.Core qualified as Connection import Hasql.Decoders.All qualified as Decoders +import Hasql.Decoders.Results qualified as Decoders.Results import Hasql.Encoders.All qualified as Encoders import Hasql.Encoders.Params qualified as Encoders.Params import Hasql.Errors @@ -10,16 +11,33 @@ import Hasql.IO qualified as IO import Hasql.Prelude import Hasql.PreparedStatementRegistry qualified as PreparedStatementRegistry import Hasql.Statement qualified as Statement +import System.IO (BufferMode (NoBuffering), hSetBuffering, stdout) run :: Pipeline a -> Connection.Connection -> IO (Either QueryError a) -run (Pipeline send recv) (Connection.Connection pqConnectionRef integerDatetimes registry) = +run (Pipeline send recv) (Connection.Connection pqConnectionRef integerDatetimes registry) = do + hSetBuffering stdout NoBuffering withMVar pqConnectionRef \pqConnection -> do - Pq.enterPipelineMode pqConnection + putStrLn "enterPipelineMode" + runCommandFailing pqConnection $ Pq.enterPipelineMode pqConnection + putStrLn "send" sendResult <- send pqConnection integerDatetimes registry - Pq.pipelineSync pqConnection + putStrLn "pipelineSync" + runCommandFailing pqConnection $ Pq.pipelineSync pqConnection + putStrLn "recv" recvResult <- recv pqConnection integerDatetimes - Pq.exitPipelineMode pqConnection + putStrLn "exitPipelineMode" + handleEither =<< Decoders.Results.run Decoders.Results.dropRemainders pqConnection integerDatetimes + putStrLn "exitPipelineMode" + runCommandFailing pqConnection $ Pq.exitPipelineMode pqConnection + putStrLn "return" pure (sendResult *> recvResult) + where + runCommandFailing :: Pq.Connection -> IO Bool -> IO () + runCommandFailing pqConn runCmd = + IO.checkedSend pqConn runCmd >>= handleEither + handleEither = \case + Right a -> pure a + Left err -> fail $ show err data Pipeline a = Pipeline @@ -60,6 +78,7 @@ statement params (Statement.Statement template (Encoders.Params paramsEncoder) ( recv pqConnection integerDatetimes = mapLeft commandToQueryError + -- <$> Decoders.Results.run decoder (integerDatetimes, pqConnection) <$> IO.getResults pqConnection integerDatetimes decoder commandToQueryError = diff --git a/testing-utils/Hasql/TestingUtils/Statements/GenerateSeries.hs b/testing-utils/Hasql/TestingUtils/Statements/GenerateSeries.hs index bb0e4b4..67937e2 100644 --- a/testing-utils/Hasql/TestingUtils/Statements/GenerateSeries.hs +++ b/testing-utils/Hasql/TestingUtils/Statements/GenerateSeries.hs @@ -12,7 +12,7 @@ data Params = Params end :: Int64 } -type Result = Vector Int64 +type Result = [Int64] session :: Bool -> Params -> Session.Session Result session prepared params = @@ -39,6 +39,6 @@ encoder = decoder :: Decoders.Result Result decoder = - Decoders.rowVector + Decoders.rowList ( Decoders.column (Decoders.nonNullable Decoders.int8) )