mirror of
https://github.com/postgres-haskell/postgres-wire.git
synced 2024-11-23 00:00:29 +03:00
Refactored Query module
This commit is contained in:
parent
2f877ab715
commit
88bb3ae7d7
@ -20,7 +20,7 @@ module Database.PostgreSQL.Driver
|
|||||||
, sendBatchAndSync
|
, sendBatchAndSync
|
||||||
, sendBatchAndFlush
|
, sendBatchAndFlush
|
||||||
, readNextData
|
, readNextData
|
||||||
, readReadyForQuery
|
, waitReadyForQuery
|
||||||
, sendSimpleQuery
|
, sendSimpleQuery
|
||||||
, describeStatement
|
, describeStatement
|
||||||
-- * Errors
|
-- * Errors
|
||||||
|
@ -358,8 +358,8 @@ sendMessage :: RawConnection -> ClientMessage -> IO ()
|
|||||||
sendMessage rawConn msg = void $
|
sendMessage rawConn msg = void $
|
||||||
rSend rawConn . runEncode $ encodeClientMessage msg
|
rSend rawConn . runEncode $ encodeClientMessage msg
|
||||||
|
|
||||||
sendEncode :: RawConnection -> Encode -> IO ()
|
sendEncode :: Connection -> Encode -> IO ()
|
||||||
sendEncode rawConn = void . rSend rawConn . runEncode
|
sendEncode conn = void . rSend (connRawConnection conn) . runEncode
|
||||||
|
|
||||||
withConnectionMode
|
withConnectionMode
|
||||||
:: Connection -> ConnectionMode -> (Connection -> IO a) -> IO a
|
:: Connection -> ConnectionMode -> (Connection -> IO a) -> IO a
|
||||||
|
@ -24,19 +24,38 @@ data Query = Query
|
|||||||
, qCachePolicy :: CachePolicy
|
, qCachePolicy :: CachePolicy
|
||||||
} deriving (Show)
|
} deriving (Show)
|
||||||
|
|
||||||
|
-- | Public
|
||||||
|
sendBatchAndFlush :: Connection -> [Query] -> IO ()
|
||||||
|
sendBatchAndFlush = sendBatchEndBy Flush
|
||||||
|
|
||||||
-- | Public
|
-- | Public
|
||||||
sendBatchAndSync :: Connection -> [Query] -> IO ()
|
sendBatchAndSync :: Connection -> [Query] -> IO ()
|
||||||
sendBatchAndSync = sendBatchEndBy Sync
|
sendBatchAndSync = sendBatchEndBy Sync
|
||||||
|
|
||||||
-- | Public
|
-- | Public
|
||||||
sendBatchAndFlush :: Connection -> [Query] -> IO ()
|
sendSimpleQuery :: Connection -> B.ByteString -> IO (Either Error ())
|
||||||
sendBatchAndFlush = sendBatchEndBy Flush
|
sendSimpleQuery conn q = withConnectionMode conn SimpleQueryMode $ \c -> do
|
||||||
|
sendMessage (connRawConnection c) $ SimpleQuery (StatementSQL q)
|
||||||
|
waitReadyForQuery c
|
||||||
|
|
||||||
|
-- | Public
|
||||||
|
readNextData :: Connection -> IO (Either Error DataMessage)
|
||||||
|
readNextData conn = readChan $ connOutDataChan conn
|
||||||
|
|
||||||
|
-- | Public
|
||||||
|
-- MUST BE called after every sended `Sync` message
|
||||||
|
-- discards all messages preceding `ReadyForQuery`
|
||||||
|
waitReadyForQuery :: Connection -> IO (Either Error ())
|
||||||
|
waitReadyForQuery = fmap (>>= (liftError . findFirstError))
|
||||||
|
. collectUntilReadyForQuery
|
||||||
|
where
|
||||||
|
liftError = maybe (Right ()) (Left . PostgresError)
|
||||||
|
|
||||||
-- Helper
|
-- Helper
|
||||||
sendBatchEndBy :: ClientMessage -> Connection -> [Query] -> IO ()
|
sendBatchEndBy :: ClientMessage -> Connection -> [Query] -> IO ()
|
||||||
sendBatchEndBy msg conn qs = do
|
sendBatchEndBy msg conn qs = do
|
||||||
batch <- constructBatch conn qs
|
batch <- constructBatch conn qs
|
||||||
sendEncode (connRawConnection conn) $ batch <> encodeClientMessage msg
|
sendEncode conn $ batch <> encodeClientMessage msg
|
||||||
|
|
||||||
constructBatch :: Connection -> [Query] -> IO Encode
|
constructBatch :: Connection -> [Query] -> IO Encode
|
||||||
constructBatch conn = fmap fold . traverse constructSingle
|
constructBatch conn = fmap fold . traverse constructSingle
|
||||||
@ -65,53 +84,18 @@ constructBatch conn = fmap fold . traverse constructSingle
|
|||||||
Execute pname noLimitToReceive
|
Execute pname noLimitToReceive
|
||||||
pure $ parseMessage <> bindMessage <> executeMessage
|
pure $ parseMessage <> bindMessage <> executeMessage
|
||||||
|
|
||||||
-- | Public
|
|
||||||
readNextData :: Connection -> IO (Either Error DataMessage)
|
|
||||||
readNextData conn = readChan $ connOutDataChan conn
|
|
||||||
|
|
||||||
-- | Public
|
|
||||||
sendSimpleQuery :: Connection -> B.ByteString -> IO (Either Error ())
|
|
||||||
sendSimpleQuery conn q = withConnectionMode conn SimpleQueryMode $ \c -> do
|
|
||||||
sendMessage (connRawConnection c) $ SimpleQuery (StatementSQL q)
|
|
||||||
readReadyForQuery c
|
|
||||||
|
|
||||||
|
|
||||||
-- | Public
|
|
||||||
-- SHOULD BE called after every sended `Sync` message
|
|
||||||
-- skips all messages except `ReadyForQuery`
|
|
||||||
readReadyForQuery :: Connection -> IO (Either Error ())
|
|
||||||
readReadyForQuery = fmap (>>= (liftError . findFirstError))
|
|
||||||
. collectBeforeReadyForQuery
|
|
||||||
where
|
|
||||||
liftError = maybe (Right ()) (Left . PostgresError)
|
|
||||||
|
|
||||||
findFirstError :: [ServerMessage] -> Maybe ErrorDesc
|
|
||||||
findFirstError [] = Nothing
|
|
||||||
findFirstError (ErrorResponse desc : _) = Just desc
|
|
||||||
findFirstError (_ : xs) = findFirstError xs
|
|
||||||
|
|
||||||
-- Collects all messages received before ReadyForQuery
|
|
||||||
collectBeforeReadyForQuery :: Connection -> IO (Either Error [ServerMessage])
|
|
||||||
collectBeforeReadyForQuery conn = do
|
|
||||||
msg <- readChan $ connOutAllChan conn
|
|
||||||
case msg of
|
|
||||||
Left e -> pure $ Left e
|
|
||||||
Right ReadForQuery{} -> pure $ Right []
|
|
||||||
Right m -> fmap (m:) <$> collectBeforeReadyForQuery conn
|
|
||||||
|
|
||||||
-- | Public
|
-- | Public
|
||||||
describeStatement
|
describeStatement
|
||||||
:: Connection
|
:: Connection
|
||||||
-> B.ByteString
|
-> B.ByteString
|
||||||
-> IO (Either Error (V.Vector Oid, V.Vector FieldDescription))
|
-> IO (Either Error (V.Vector Oid, V.Vector FieldDescription))
|
||||||
describeStatement conn stmt = do
|
describeStatement conn stmt = do
|
||||||
sendEncode s $
|
sendEncode conn $
|
||||||
encodeClientMessage (Parse sname (StatementSQL stmt) V.empty)
|
encodeClientMessage (Parse sname (StatementSQL stmt) V.empty)
|
||||||
<> encodeClientMessage (DescribeStatement sname)
|
<> encodeClientMessage (DescribeStatement sname)
|
||||||
<> encodeClientMessage Sync
|
<> encodeClientMessage Sync
|
||||||
(parseMessages =<<) <$> collectBeforeReadyForQuery conn
|
(parseMessages =<<) <$> collectUntilReadyForQuery conn
|
||||||
where
|
where
|
||||||
s = connRawConnection conn
|
|
||||||
sname = StatementName ""
|
sname = StatementName ""
|
||||||
parseMessages msgs = case msgs of
|
parseMessages msgs = case msgs of
|
||||||
[ParameterDescription params, NoData]
|
[ParameterDescription params, NoData]
|
||||||
@ -119,7 +103,21 @@ describeStatement conn stmt = do
|
|||||||
[ParameterDescription params, RowDescription fields]
|
[ParameterDescription params, RowDescription fields]
|
||||||
-> Right (params, fields)
|
-> Right (params, fields)
|
||||||
xs -> Left . maybe
|
xs -> Left . maybe
|
||||||
(DecodeError "Unexpected response on describe query")
|
(DecodeError "Unexpected response on a describe query")
|
||||||
PostgresError
|
PostgresError
|
||||||
$ findFirstError xs
|
$ findFirstError xs
|
||||||
|
|
||||||
|
-- Collects all messages preceding `ReadyForQuery`
|
||||||
|
collectUntilReadyForQuery :: Connection -> IO (Either Error [ServerMessage])
|
||||||
|
collectUntilReadyForQuery conn = do
|
||||||
|
msg <- readChan $ connOutAllChan conn
|
||||||
|
case msg of
|
||||||
|
Left e -> pure $ Left e
|
||||||
|
Right ReadForQuery{} -> pure $ Right []
|
||||||
|
Right m -> fmap (m:) <$> collectUntilReadyForQuery conn
|
||||||
|
|
||||||
|
findFirstError :: [ServerMessage] -> Maybe ErrorDesc
|
||||||
|
findFirstError [] = Nothing
|
||||||
|
findFirstError (ErrorResponse desc : _) = Just desc
|
||||||
|
findFirstError (_ : xs) = findFirstError xs
|
||||||
|
|
||||||
|
@ -58,7 +58,7 @@ testBatch = withConnection $ \c -> do
|
|||||||
let a = "5"
|
let a = "5"
|
||||||
b = "3"
|
b = "3"
|
||||||
sendBatchAndSync c [makeQuery1 a, makeQuery1 b]
|
sendBatchAndSync c [makeQuery1 a, makeQuery1 b]
|
||||||
readReadyForQuery c
|
waitReadyForQuery c
|
||||||
|
|
||||||
r1 <- readNextData c
|
r1 <- readNextData c
|
||||||
r2 <- readNextData c
|
r2 <- readNextData c
|
||||||
@ -77,7 +77,7 @@ testTwoBatches = withConnection $ \c -> do
|
|||||||
|
|
||||||
sendBatchAndSync c [makeQuery2 r1 r2]
|
sendBatchAndSync c [makeQuery2 r1 r2]
|
||||||
r <- readNextData c
|
r <- readNextData c
|
||||||
readReadyForQuery c
|
waitReadyForQuery c
|
||||||
|
|
||||||
BS.pack (show $ a + b) @=? fromMessage r
|
BS.pack (show $ a + b) @=? fromMessage r
|
||||||
|
|
||||||
@ -93,7 +93,7 @@ testMultipleBatches = withConnection $ replicateM_ 10 . assertSingleBatch
|
|||||||
a @=? fromMessage r1
|
a @=? fromMessage r1
|
||||||
r2 <- readNextData c
|
r2 <- readNextData c
|
||||||
b @=? fromMessage r2
|
b @=? fromMessage r2
|
||||||
readReadyForQuery c
|
waitReadyForQuery c
|
||||||
|
|
||||||
-- | Query is empty string.
|
-- | Query is empty string.
|
||||||
testEmptyQuery :: IO ()
|
testEmptyQuery :: IO ()
|
||||||
@ -110,7 +110,7 @@ assertQueryNoData :: Query -> IO ()
|
|||||||
assertQueryNoData q = withConnection $ \c -> do
|
assertQueryNoData q = withConnection $ \c -> do
|
||||||
sendBatchAndSync c [q]
|
sendBatchAndSync c [q]
|
||||||
r <- fromRight <$> readNextData c
|
r <- fromRight <$> readNextData c
|
||||||
readReadyForQuery c
|
waitReadyForQuery c
|
||||||
DataMessage [] @=? r
|
DataMessage [] @=? r
|
||||||
|
|
||||||
-- | Asserts that all the received data rows are in form (Right _)
|
-- | Asserts that all the received data rows are in form (Right _)
|
||||||
@ -144,7 +144,7 @@ testInvalidBatch = do
|
|||||||
where
|
where
|
||||||
assertInvalidBatch desc qs = withConnection $ \c -> do
|
assertInvalidBatch desc qs = withConnection $ \c -> do
|
||||||
sendBatchAndSync c qs
|
sendBatchAndSync c qs
|
||||||
readReadyForQuery c
|
waitReadyForQuery c
|
||||||
checkInvalidResult c $ length qs
|
checkInvalidResult c $ length qs
|
||||||
|
|
||||||
-- | Describes usual statement.
|
-- | Describes usual statement.
|
||||||
@ -189,14 +189,14 @@ testSimpleAndExtendedQuery = withConnection $ \c -> do
|
|||||||
b = "2"
|
b = "2"
|
||||||
d = "5"
|
d = "5"
|
||||||
sendBatchAndSync c [ makeQuery1 a , makeQuery1 b]
|
sendBatchAndSync c [ makeQuery1 a , makeQuery1 b]
|
||||||
readReadyForQuery c
|
waitReadyForQuery c
|
||||||
checkRightResult c 2
|
checkRightResult c 2
|
||||||
|
|
||||||
rs <- sendSimpleQuery c "SELECT * FROM generate_series(1, 10)"
|
rs <- sendSimpleQuery c "SELECT * FROM generate_series(1, 10)"
|
||||||
assertBool "Should be Right" $ isRight rs
|
assertBool "Should be Right" $ isRight rs
|
||||||
|
|
||||||
sendBatchAndSync c [makeQuery1 d]
|
sendBatchAndSync c [makeQuery1 d]
|
||||||
fr <- readReadyForQuery c
|
fr <- waitReadyForQuery c
|
||||||
assertBool "Should be Right" $ isRight fr
|
assertBool "Should be Right" $ isRight fr
|
||||||
r <- fromMessage <$> readNextData c
|
r <- fromMessage <$> readNextData c
|
||||||
r @=? d
|
r @=? d
|
||||||
@ -209,7 +209,7 @@ testPreparedStatementCache = withConnection $ \c -> do
|
|||||||
sendBatchAndSync c [ makeQuery1 (BS.pack (show a))
|
sendBatchAndSync c [ makeQuery1 (BS.pack (show a))
|
||||||
, makeQuery1 (BS.pack (show b))
|
, makeQuery1 (BS.pack (show b))
|
||||||
, makeQuery2 (BS.pack (show a)) (BS.pack (show b))]
|
, makeQuery2 (BS.pack (show a)) (BS.pack (show b))]
|
||||||
readReadyForQuery c
|
waitReadyForQuery c
|
||||||
r1 <- fromMessage <$> readNextData c
|
r1 <- fromMessage <$> readNextData c
|
||||||
r2 <- fromMessage <$> readNextData c
|
r2 <- fromMessage <$> readNextData c
|
||||||
r3 <- fromMessage <$> readNextData c
|
r3 <- fromMessage <$> readNextData c
|
||||||
@ -226,12 +226,11 @@ testPreparedStatementCache = withConnection $ \c -> do
|
|||||||
testLargeQuery :: IO ()
|
testLargeQuery :: IO ()
|
||||||
testLargeQuery = withConnection $ \c -> do
|
testLargeQuery = withConnection $ \c -> do
|
||||||
sendBatchAndSync c [Query largeStmt V.empty Text Text NeverCache ]
|
sendBatchAndSync c [Query largeStmt V.empty Text Text NeverCache ]
|
||||||
readReadyForQuery c
|
waitReadyForQuery c
|
||||||
r <- readNextData c
|
r <- readNextData c
|
||||||
assertBool "Should be Right" $ isRight r
|
assertBool "Should be Right" $ isRight r
|
||||||
where
|
where
|
||||||
largeStmt = "select typname, typnamespace, typowner, typlen, typbyval,"
|
largeStmt = "select typname, typnamespace, typowner, typlen, typbyval,"
|
||||||
<> "typcategory, typispreferred, typisdefined, typdelim,"
|
<> "typcategory, typispreferred, typisdefined, typdelim,"
|
||||||
<> "typrelid, typelem, typarray from pg_type "
|
<> "typrelid, typelem, typarray from pg_type "
|
||||||
<> "where typtypmod = -1 and typisdefined = true"
|
|
||||||
|
|
||||||
|
@ -35,7 +35,7 @@ testSimpleQuery = withConnectionAll $ \c -> do
|
|||||||
<> "SELECT * FROM a;"
|
<> "SELECT * FROM a;"
|
||||||
<> "DROP TABLE a;"
|
<> "DROP TABLE a;"
|
||||||
sendMessage rawConn $ SimpleQuery statement
|
sendMessage rawConn $ SimpleQuery statement
|
||||||
msgs <- collectBeforeReadyForQuery c
|
msgs <- collectUntilReadyForQuery c
|
||||||
assertNoErrorResponse msgs
|
assertNoErrorResponse msgs
|
||||||
assertContains msgs isCommandComplete "Command complete"
|
assertContains msgs isCommandComplete "Command complete"
|
||||||
where
|
where
|
||||||
@ -60,7 +60,7 @@ testExtendedQuery = withConnectionAll $ \c -> do
|
|||||||
sendMessage rawConn Flush
|
sendMessage rawConn Flush
|
||||||
sendMessage rawConn Sync
|
sendMessage rawConn Sync
|
||||||
|
|
||||||
msgs <- collectBeforeReadyForQuery c
|
msgs <- collectUntilReadyForQuery c
|
||||||
assertNoErrorResponse msgs
|
assertNoErrorResponse msgs
|
||||||
assertContains msgs isBindComplete "BindComplete"
|
assertContains msgs isBindComplete "BindComplete"
|
||||||
assertContains msgs isCloseComplete "CloseComplete"
|
assertContains msgs isCloseComplete "CloseComplete"
|
||||||
@ -91,7 +91,7 @@ testExtendedEmptyQuery :: IO ()
|
|||||||
testExtendedEmptyQuery = withConnectionAll $ \c -> do
|
testExtendedEmptyQuery = withConnectionAll $ \c -> do
|
||||||
let query = Query "" V.empty Text Text NeverCache
|
let query = Query "" V.empty Text Text NeverCache
|
||||||
sendBatchAndSync c [query]
|
sendBatchAndSync c [query]
|
||||||
msgs <- collectBeforeReadyForQuery c
|
msgs <- collectUntilReadyForQuery c
|
||||||
assertNoErrorResponse msgs
|
assertNoErrorResponse msgs
|
||||||
assertContains msgs isEmptyQueryResponse "EmptyQueryResponse"
|
assertContains msgs isEmptyQueryResponse "EmptyQueryResponse"
|
||||||
where
|
where
|
||||||
@ -109,7 +109,7 @@ testExtendedQueryNoData = withConnectionAll $ \c -> do
|
|||||||
sendMessage rawConn $ DescribeStatement sname
|
sendMessage rawConn $ DescribeStatement sname
|
||||||
sendMessage rawConn Sync
|
sendMessage rawConn Sync
|
||||||
|
|
||||||
msgs <- collectBeforeReadyForQuery c
|
msgs <- collectUntilReadyForQuery c
|
||||||
assertContains msgs isNoData "NoData"
|
assertContains msgs isNoData "NoData"
|
||||||
where
|
where
|
||||||
isNoData NoData = True
|
isNoData NoData = True
|
||||||
|
Loading…
Reference in New Issue
Block a user