Unite the streaming executors and introduce a single

This commit is contained in:
Nikita Volkov 2014-10-23 20:46:54 +04:00
parent d946f442d9
commit 9baf77d9c0

View File

@ -19,8 +19,8 @@ module Hasql
-- * Statement Execution -- * Statement Execution
unitTx, unitTx,
countTx, countTx,
singleTx,
streamTx, streamTx,
cursorStreamTx,
-- * Results Stream -- * Results Stream
TxListT, TxListT,
@ -248,32 +248,31 @@ countTx s =
-- | -- |
-- Execute a statement, -- Execute a statement,
-- which produces a results stream: -- which produces a single result row:
-- a @SELECT@ or an @INSERT@, -- a @SELECT@
-- which produces a generated value (e.g., an auto-incremented id). -- or an @INSERT@, which produces a generated value (e.g., an auto-incremented id).
streamTx :: Backend b => RowParser b r => Backend.Statement b -> TxListT s (Tx b s) r singleTx :: Backend b => RowParser b r => Backend.Statement b -> Tx b s (Maybe r)
streamTx s = singleTx s =
do ListT.head $ streamTx False s
r <- lift $ Tx $ ReaderT $ \c -> Backend.executeAndStream s c
hoistBackendStream r
-- | -- |
-- Execute a @SELECT@ statement -- Execute a @SELECT@ statement,
-- and produce a results stream, -- and produce a results stream.
-- which utilizes a database cursor. -- The boolean parameter specifies,
-- This function allows you to fetch virtually limitless results in a constant memory. -- whether to utilize a cursor.
cursorStreamTx :: Backend b => RowParser b r => Backend.Statement b -> TxListT s (Tx b s) r --
cursorStreamTx s = -- Cursor allows you to fetch virtually limitless results in a constant memory
-- at a cost of a small overhead.
-- Note that in most databases cursors require establishing a database transaction.
streamTx :: Backend b => RowParser b r => Bool -> Backend.Statement b -> TxListT s (Tx b s) r
streamTx cursor s =
do do
r <- lift $ Tx $ ReaderT $ \c -> Backend.executeAndStreamWithCursor s c r <- lift $ Tx $ ReaderT $ \c -> executor s c
hoistBackendStream r hoistBackendStream r
where
executor =
-- * Helpers if cursor then Backend.executeAndStreamWithCursor else Backend.executeAndStream
------------------------- hoistBackendStream (w, s) =
TxListT $ hoist (Tx . lift) $ do
hoistBackendStream :: RowParser b r => Backend.ResultsStream b -> TxListT s (Tx b s) r row <- ($ s) $ ListT.slice $ fromMaybe ($bug "Invalid row width") $ ListT.positive w
hoistBackendStream (w, s) = either (lift . throwIO . ResultParsingError) return $ RowParser.parseRow row
TxListT $ hoist (Tx . lift) $ do
row <- ($ s) $ ListT.slice $ fromMaybe ($bug "Invalid row width") $ ListT.positive w
either (lift . throwIO . ResultParsingError) return $ RowParser.parseRow row