This commit is contained in:
Nikita Volkov 2024-04-21 08:54:52 +03:00
parent 117ebc3d0e
commit 851444711b

View File

@ -1,3 +1,5 @@
{-# OPTIONS_GHC -Wno-unused-imports -Wno-unused-binds #-}
module Hasql.Pipeline.Core where
import Database.PostgreSQL.LibPQ qualified as Pq
@ -14,23 +16,30 @@ import Hasql.Statement qualified as Statement
import System.IO (BufferMode (NoBuffering), hSetBuffering, stdout)
run :: Pipeline a -> Connection.Connection -> IO (Either QueryError a)
run (Pipeline send recv) (Connection.Connection pqConnectionRef integerDatetimes registry) = do
run (Pipeline send) (Connection.Connection pqConnectionRef integerDatetimes registry) = do
hSetBuffering stdout NoBuffering
withMVar pqConnectionRef \pqConnection -> do
putStrLn "enterPipelineMode"
runCommandFailing pqConnection $ Pq.enterPipelineMode pqConnection
putStrLn "send"
sendResult <- send pqConnection integerDatetimes registry
putStrLn "pipelineSync"
runCommandFailing pqConnection $ Pq.pipelineSync pqConnection
putStrLn "recv"
recvResult <- recv pqConnection integerDatetimes
putStrLn "exitPipelineMode"
handleEither =<< Decoders.Results.run Decoders.Results.dropRemainders pqConnection integerDatetimes
putStrLn "exitPipelineMode"
runCommandFailing pqConnection $ Pq.exitPipelineMode pqConnection
putStrLn "return"
pure (sendResult *> recvResult)
sendResult <- send pqConnection registry integerDatetimes
case sendResult of
Left err -> do
pure (Left err)
Right recv -> do
putStrLn "pipelineSync"
runCommandFailing pqConnection $ Pq.pipelineSync pqConnection
putStrLn "recv"
recvResult <- recv
case recvResult of
Left err -> pure (Left err)
Right res -> do
putStrLn "dropRemainders"
handleEither =<< Decoders.Results.run Decoders.Results.dropRemainders pqConnection integerDatetimes
putStrLn "exitPipelineMode"
runCommandFailing pqConnection $ Pq.exitPipelineMode pqConnection
putStrLn "return"
pure (sendResult *> recvResult)
where
runCommandFailing :: Pq.Connection -> IO Bool -> IO ()
runCommandFailing pqConn runCmd =
@ -39,47 +48,63 @@ run (Pipeline send recv) (Connection.Connection pqConnectionRef integerDatetimes
Right a -> pure a
Left err -> fail $ show err
data Pipeline a
newtype Pipeline a
= Pipeline
-- | Send commands.
(Pq.Connection -> Bool -> PreparedStatementRegistry.PreparedStatementRegistry -> IO (Either QueryError ()))
-- | Receive results.
(Pq.Connection -> Bool -> IO (Either QueryError a))
( Pq.Connection ->
PreparedStatementRegistry.PreparedStatementRegistry ->
Bool ->
IO (Either QueryError (IO (Either QueryError a)))
)
deriving (Functor)
instance Applicative Pipeline where
pure a =
Pipeline send recv
where
send _ _ _ =
pure (Right ())
recv _ _ =
pure (Right a)
Pipeline (\_ _ _ -> pure (Right (pure (Right a))))
Pipeline lSend lRecv <*> Pipeline rSend rRecv =
Pipeline send recv
where
send pqConn idt pReg = do
lSendRes <- lSend pqConn idt pReg
rSendRes <- rSend pqConn idt pReg
pure (lSendRes *> rSendRes)
recv pqConn idt = do
lRecvRes <- lRecv pqConn idt
rRecvRes <- rRecv pqConn idt
pure (lRecvRes <*> rRecvRes)
Pipeline lSend <*> Pipeline rSend =
Pipeline \conn reg integerDatetimes ->
lSend conn reg integerDatetimes >>= \case
Left sendErr ->
pure (Left sendErr)
Right lRecv ->
rSend conn reg integerDatetimes <&> \case
Left sendErr ->
Left sendErr
Right rRecv ->
Right (liftA2 (<*>) lRecv rRecv)
statement :: params -> Statement.Statement params result -> Pipeline result
statement params (Statement.Statement template (Encoders.Params paramsEncoder) (Decoders.Result decoder) preparable) =
Pipeline send recv
statement params (Statement.Statement sql (Encoders.Params encoder) (Decoders.Result decoder) preparable) =
Pipeline run
where
send pqConnection integerDatetimes registry =
mapLeft commandToQueryError
<$> IO.sendParametricStatement pqConnection integerDatetimes registry template paramsEncoder preparable params
run connection registry integerDatetimes = do
error "TODO"
where
(oidList, valueAndFormatList) =
Encoders.Params.compilePreparedStatementData encoder integerDatetimes params
recv pqConnection integerDatetimes =
mapLeft commandToQueryError
-- <$> Decoders.Results.run decoder (integerDatetimes, pqConnection)
<$> IO.getResults pqConnection integerDatetimes decoder
resolvePreparedStatementKey :: IO (Either QueryError (ByteString, IO (Either QueryError ())))
resolvePreparedStatementKey =
PreparedStatementRegistry.update localKey onNewRemoteKey onOldRemoteKey registry
where
localKey =
PreparedStatementRegistry.LocalKey sql oidList
onNewRemoteKey key =
do
sent <- Pq.sendPrepare connection key sql (mfilter (not . null) (Just oidList))
if sent
then pure (True, Right (key, recv))
else (False,) . Left . commandToQueryError . ClientError <$> Pq.errorMessage connection
where
recv :: IO (Either QueryError ())
recv = do
Pq.getResult connection >>= \case
Nothing ->
Left . commandToQueryError . ClientError <$> Pq.errorMessage connection
Just result ->
error "TODO"
onOldRemoteKey key =
pure (Right (key, pure (Right ())))
commandToQueryError =
QueryError template (Encoders.Params.renderReadable paramsEncoder params)
QueryError sql (Encoders.Params.renderReadable encoder params)