Move the streaming functions out of the base monad

This commit is contained in:
Nikita Volkov 2014-10-23 20:33:31 +04:00
parent 81c9d9996c
commit d946f442d9

View File

@ -17,15 +17,13 @@ module Hasql
QQ.q,
-- * Statement Execution
StatementTx,
unitTx,
countTx,
streamTx,
cursorStreamTx,
-- * Results Stream
TxStream,
TxStreamT,
TxListT,
-- * Row parser
RowParser.RowParser(..),
@ -187,24 +185,21 @@ runTx connection mode (Tx reader) =
-- |
-- A stream of results,
-- which fetches only those that you reach.
type TxStream b s r =
TxStreamT s (Tx b s) r
-- |
-- A wrapper around 'ListT.ListT',
--
-- It's a wrapper around 'ListT.ListT',
-- which uses the same trick as the 'ST' monad to associate with the
-- context transaction and become impossible to be used outside of it.
-- This lets the library ensure that it is safe to automatically
-- release all the resources associated with this stream.
--
-- All the functions of the \"list-t\" library are applicable to this type,
-- amongst which are 'ListT.fold', 'ListT.traverse_', 'ListT.toList'.
newtype TxStreamT s m r =
TxStreamT (ListT.ListT m r)
-- amongst which are 'ListT.head', 'ListT.toList', 'ListT.fold', 'ListT.traverse_'.
newtype TxListT s m r =
TxListT (ListT.ListT m r)
deriving (Functor, Applicative, Alternative, Monad, MonadTrans, MonadPlus,
Monoid, ListT.ListMonad)
instance ListT.ListTrans (TxStreamT s) where
instance ListT.ListTrans (TxListT s) where
uncons =
unsafeCoerce
(ListT.uncons :: ListT.ListT m r -> m (Maybe (r, ListT.ListT m r)))
@ -238,22 +233,16 @@ instance Exception Error
-- * Statements execution
-------------------------
-- |
-- A function executing a statement in a transaction.
type StatementTx b s r =
Backend b =>
Backend.Statement b -> Tx b s r
-- |
-- Execute a statement, which produces no result.
unitTx :: StatementTx b s ()
unitTx :: Backend b => Backend.Statement b -> Tx b s ()
unitTx s =
Tx $ ReaderT $ Backend.execute s
-- |
-- Execute a statement and count the amount of affected rows.
-- Useful for resolving how many rows were updated or deleted.
countTx :: (Backend.Mapping b Word64) => StatementTx b s Word64
countTx :: Backend b => Backend.Mapping b Word64 => Backend.Statement b -> Tx b s Word64
countTx s =
Tx $ ReaderT $ Backend.executeAndCountEffects s
@ -262,27 +251,29 @@ countTx s =
-- which produces a results stream:
-- a @SELECT@ or an @INSERT@,
-- which produces a generated value (e.g., an auto-incremented id).
streamTx :: RowParser b r => StatementTx b s (TxStream b s r)
streamTx :: Backend b => RowParser b r => Backend.Statement b -> TxListT s (Tx b s) r
streamTx s =
Tx $ ReaderT $ \c -> do
fmap hoistBackendStream $ Backend.executeAndStream s c
do
r <- lift $ Tx $ ReaderT $ \c -> Backend.executeAndStream s c
hoistBackendStream r
-- |
-- Execute a @SELECT@ statement
-- and produce a results stream,
-- which utilizes a database cursor.
-- This function allows you to fetch virtually limitless results in a constant memory.
cursorStreamTx :: (RowParser b r) => StatementTx b s (TxStream b s r)
cursorStreamTx :: Backend b => RowParser b r => Backend.Statement b -> TxListT s (Tx b s) r
cursorStreamTx s =
Tx $ ReaderT $ \c -> do
fmap hoistBackendStream $ Backend.executeAndStreamWithCursor s c
do
r <- lift $ Tx $ ReaderT $ \c -> Backend.executeAndStreamWithCursor s c
hoistBackendStream r
-- * Helpers
-------------------------
hoistBackendStream :: RowParser b r => Backend.ResultsStream b -> TxStream b s r
hoistBackendStream :: RowParser b r => Backend.ResultsStream b -> TxListT s (Tx b s) r
hoistBackendStream (w, s) =
TxStreamT $ hoist (Tx . lift) $ do
TxListT $ hoist (Tx . lift) $ do
row <- ($ s) $ ListT.slice $ fromMaybe ($bug "Invalid row width") $ ListT.positive w
either (lift . throwIO . ResultParsingError) return $ RowParser.parseRow row