New API with example
This commit is contained in:
Normal file
Normal file
@ -0,0 +1,40 @@
main =
H.withPool $ \pool -> do
artistID <-
join $ fmap ListT.head $ H.runExecutor pool $ H.selectExecutor $
[H.q| SELECT id FROM artists WHERE name = ? |]
userID <-
join $ fmap ListT.head $ H.runExecutor pool $ H.selectExecutor $
[H.q| SELECT id FROM users WHERE name = ? |]
"Nikita Volkov"
main =
H.withPool $ \pool -> do
H.runExecutor pool $ do
H.writeTransactionExecutor H.Serialized $ do
artistIDMaybe <-
H.selectTransaction $
[H.q| SELECT id FROM artists WHERE name = ? |]
userIDMaybe <-
H.selectTransaction $
[H.q| SELECT id FROM users WHERE name = ? |]
"Nikita Volkov"
forM_ ((,) <$> artistIDMaybe <*> userIDMaybe) $ \(artistID, userID) -> do
H.insertTransaction $
[H.q| INSERT INTO artists_fans (artist_id, user_id) VALUES (?, ?) |]
@ -40,7 +40,7 @@ library
@ -3,17 +3,24 @@ module HighSQL.API where
import HighSQL.Prelude hiding (read, Read, write, Write, Error)
import qualified Data.Pool as Pool
import qualified HighSQL.Backend as Backend
import qualified HighSQL.Row as Row
import qualified HighSQL.RowParser as RowParser
import qualified ListT
type Backend =
type RowParser =
-- * Pool
-- |
-- A pool of connections to the database.
newtype Pool b =
Pool (Pool.Pool (Backend.Connection b))
type Pool b =
Pool.Pool (Backend.Connection b)
-- |
-- Pool initization settings.
@ -40,9 +47,9 @@ data Settings =
-- Initialize a pool given a backend and settings
-- and run an IO computation with it,
-- while automating the resource management.
withPool :: Backend.Backend b => b -> Settings -> (Pool b -> IO a) -> IO a
withPool :: Backend b => b -> Settings -> (Pool b -> IO a) -> IO a
withPool b s =
bracket acquire release
bracket acquire Pool.purgePool
acquire =
@ -50,9 +57,8 @@ withPool b s =
(Backend.connect b) (Backend.disconnect)
(striping1 s) (connectionTimeout s) (striping2 s)
return (Pool pool)
release (Pool pool) =
Pool.purgePool pool
return pool
-- * Error
@ -63,7 +69,7 @@ data Error =
-- |
-- Cannot connect to a server
-- or the connection got interrupted.
Disconnected Text |
ConnectionLost Text |
-- |
-- Attempt to parse a statement execution result into an incompatible type.
-- Indicates either a mismatching schema or an incorrect query.
@ -73,6 +79,67 @@ data Error =
instance Exception Error
-- * Executors
-- |
-- A connections 'Pool' context monad.
type Executor b =
ReaderT (Pool b) IO
executorIO :: Pool b -> Executor b r -> IO r
executorIO =
flip runReaderT
writeTransactionExecutor ::
Backend b =>
Backend.IsolationLevel -> (forall s. Transaction b Write s r) -> Executor b r
writeTransactionExecutor isolation (Transaction t) =
withConnectionExecutor $ \c ->
Backend.inTransaction (isolation, True) (runReaderT t c) c
streamingExecutor ::
forall b r.
Backend b => RowParser b r => Typeable r =>
Backend.Statement b -> Executor b (ListT (Executor b) r)
streamingExecutor s =
withConnectionExecutor $ \c -> do
(w, s) <- Backend.executeAndStream s c
return $ do
row <- hoist lift $ replicateM w s
maybe (lift $ lift $ throwIO parsingError) return $ RowParser.parse row
parsingError =
ResultParsingError (fst s) (typeOf (undefined :: r))
countingExecutor ::
Backend b =>
Backend.Statement b -> Executor b Integer
countingExecutor s =
withConnectionExecutor $ \c -> do
Backend.executeAndCountEffects s c
generatingExecutor ::
Backend b => Backend.Mapping b Integer =>
Backend.Statement b -> Executor b (Maybe Integer)
generatingExecutor s =
streamingExecutor s >>= ListT.head
unitExecutor ::
Backend b =>
Backend.Statement b -> Executor b ()
unitExecutor s =
withConnectionExecutor $ \c -> do
Backend.execute s c
withConnectionExecutor ::
(Backend.Connection b -> IO r) -> Executor b r
withConnectionExecutor f =
ReaderT $ \pool -> Pool.withResource pool $ \c -> handle handler $ f c
handler =
Backend.ConnectionLost m -> throwIO $ ConnectionLost m
-- * Transaction
@ -86,179 +153,28 @@ newtype Transaction b l s r =
Transaction (ReaderT (Backend.Connection b) IO r)
deriving (Functor, Applicative, Monad)
streamingTransaction ::
Backend b => RowParser b r => ReadingPrivilege l =>
Backend.Statement b -> Transaction b l s (ListT (Transaction b l s) r)
streamingTransaction statement =
Transaction $ ReaderT $ \connection ->
-- |
-- Execute a transaction using a connections pool.
-- Do it in the atomic mode if the first flag is true
-- and in the write mode if the second one is true.
-- Perform a select, while utilizing the database cursors functionality,
-- which allows to query for virtually unlimited result sets
-- in constant memory by utilizing streaming.
-- * Automatically retries the transaction in case of a
-- 'Backend.TransactionConflict' exception.
-- However using this function for small result sets isn't beneficial,
-- since it introduces a small overhead due to bookkeeping related to cursors.
-- * Rethrows all the other exceptions.
transaction :: (Backend.Backend b) => Bool -> Bool -> Pool b -> (forall s. Transaction b l s r) -> IO r
transaction a w (Pool p) (Transaction t) =
e <- try $ Pool.withResource p $ loop
case e of
Left (Backend.Disconnected t) ->
throwIO (Disconnected t)
Left (Backend.TransactionConflict) ->
$bug "Unexpected TransactionConflict"
Right r ->
return r
loop c =
Backend.beginTransaction a w c
e <- try $ runReaderT t c
case e of
Left Backend.TransactionConflict ->
Backend.finishTransaction False c
loop c
Left e ->
Backend.finishTransaction False c
throwIO e
Right r ->
Backend.finishTransaction True c
return r
-- ** Levels
data Read
-- -- |
-- -- Execute a transaction on a connections pool.
-- --
-- -- Requires minimal locking from the database,
-- -- however you can only execute the \"SELECT\" statements in it.
-- -- The API ensures of that on the type-level.
-- read :: Pool -> (forall s. Transaction Read s r) -> IO r
-- read = transaction False
data Write
-- -- |
-- -- Execute a transaction on a connections pool.
-- --
-- -- Allows to execute the \"SELECT\", \"UPDATE\", \"INSERT\"
-- -- and \"DELETE\" statements.
-- -- However, compared to 'read', this transaction requires the database to choose
-- -- a more resource-demanding locking strategy.
-- write :: Pool -> (forall s. Transaction Write s r) -> IO r
-- write = transaction True
data Admin
-- -- |
-- -- Execute a transaction on a connections pool.
-- --
-- -- Same as 'write', but allows you to perform any kind of statements,
-- -- including \"CREATE\", \"DROP\" and \"ALTER\".
-- admin :: Pool -> (forall s. Transaction Admin s r) -> IO r
-- admin = transaction True
-- ** Privileges
-- |
-- Produce a results stream from the statement.
select ::
forall b l s r.
(SelectPrivilege l, Row.Row b r, Backend.Backend b, Typeable r) =>
Statement b -> ResultsStream s (Transaction b l s) r
select (bs, vl) =
(w, s) <-
lift $ Transaction $ do
connection <- ask
liftIO $ do
Backend.executeStreaming bs vl Nothing connection
l <- ResultsStream $ hoist (Transaction . liftIO) $ replicateM w s
maybe throwParsingError return $ Row.parseResults l
throwParsingError =
ResultsStream $ lift $ Transaction $ liftIO $ throwIO $
ResultParsingError bs (typeOf (undefined :: r))
-- |
-- \"SELECT\"
class SelectPrivilege l
instance SelectPrivilege Read
instance SelectPrivilege Write
instance SelectPrivilege Admin
-- |
-- \"UPDATE\", \"INSERT\", \"DELETE\"
class UpdatePrivilege l
instance UpdatePrivilege Write
instance UpdatePrivilege Admin
-- |
-- Execute and count the amount of affected rows.
update :: (UpdatePrivilege l, Backend.Backend b) => Statement b -> Transaction b l s Integer
update (bs, vl) =
Transaction $ do
connection <- ask
liftIO $ do
Backend.executeCountingEffects bs vl connection
-- |
-- Execute and return the possibly auto-incremented number.
insert :: (UpdatePrivilege l, Backend.Backend b, Backend.Mapping b Integer) => Statement b -> Transaction b l s (Maybe Integer)
insert (bs, vl) =
Transaction $ do
connection <- ask
liftIO $ do
(w, l) <- Backend.executeStreaming bs vl (Just 1) connection
case w of
1 -> do
traverse (maybe throwParsingError return . Row.parseResults . pure) =<< ListT.head l
_ -> $bug "Unexpected result"
throwParsingError =
liftIO $ throwIO $ ResultParsingError bs (typeOf (undefined :: Integer))
-- |
-- \"CREATE\", \"ALTER\", \"DROP\", \"TRUNCATE\"
class CreatePrivilege l
instance CreatePrivilege Admin
create :: (CreatePrivilege l, Backend.Backend b) => Statement b -> Transaction b l s ()
create (bs, vl) =
Transaction $ do
connection <- ask
liftIO $ do
Backend.execute bs vl connection
-- * Statement
type Statement b =
(ByteString, [Backend.StatementArgument b])
mkStatement :: forall b. ByteString -> [Value b] -> Statement b
mkStatement sql values =
(,) sql (map renderValue values)
renderValue (Value v) =
Backend.renderValue v :: Backend.StatementArgument b
data Value b =
forall v. (Backend.Mapping b v) => Value !v
-- All resources are automatically managed
-- and get released on transaction finish.
streamingWithCursorTransaction ::
Backend b => RowParser b r => ReadingPrivilege l =>
Backend.Statement b -> Transaction b l s (TransactionListT s (Transaction b l s) r)
streamingWithCursorTransaction =
-- * Results Stream
@ -277,12 +193,115 @@ data Value b =
-- Hence you can only access it while remaining in a transaction,
-- and, when the transaction finishes,
-- all the acquired resources get automatically released.
newtype ResultsStream s m r =
ResultsStream (ListT.ListT m r)
newtype TransactionListT s m r =
TransactionListT (ListT.ListT m r)
deriving (Functor, Applicative, Alternative, Monad, MonadTrans, MonadPlus,
Monoid, ListT.ListMonad)
instance ListT.ListTrans (ResultsStream s) where
instance ListT.ListTrans (TransactionListT s) where
uncons =
(ListT.uncons :: ListT.ListT m r -> m (Maybe (r, ListT.ListT m r)))
-- ** Levels
-- |
-- Requires minimal locking from the database,
-- however you can only execute the \"SELECT\" statements in it.
data Read
-- |
-- Allows you to perform any kind of statements,
-- including \"SELECT\", \"UPDATE\", \"INSERT\", \"DELETE\",
-- \"CREATE\", \"DROP\" and \"ALTER\".
-- However, compared to 'Read',
-- this transaction level requires the database to choose
-- a more resource-demanding locking strategy.
data Write
-- ** Privileges
-- |
-- \"SELECT\"
class ReadingPrivilege l
instance ReadingPrivilege Read
instance ReadingPrivilege Write
-- |
-- \"UPDATE\", \"INSERT\", \"DELETE\",
-- \"CREATE\", \"ALTER\", \"DROP\", \"TRUNCATE\"
class ModificationPrivilege l
instance ModificationPrivilege Write
-- * Statement
-- type Backend.Statement b =
-- (ByteString, [Backend.StatementArgument b])
-- data Backend.Statement b =
-- Statement !ByteString ![Backend.StatementArgument b]
-- mkStatement :: forall b. ByteString -> [Value b] -> Backend.Statement b
-- mkStatement sql values =
-- (,) sql (map renderValue values)
-- where
-- renderValue (Value v) =
-- Backend.renderValue v :: Backend.StatementArgument b
-- data Value b =
-- forall v. (Backend.Mapping b v) => Value !v
-- * Aliases
-- |
-- A short form alias to 'Executor'.
type E b =
Executor b
-- |
-- A short form alias to 'executorIO'.
runE :: Pool b -> E b r -> IO r
runE = executorIO
-- |
-- A short form alias to 'writeTransactionExecutor'.
writeE :: Backend b => Backend.IsolationLevel -> (forall s. T b W s r) -> E b r
writeE = writeTransactionExecutor
-- |
-- A short form alias to 'streamingExecutor'.
streamingE :: Typeable r => Backend b => RowParser b r => S b -> E b (ListT (E b) r)
streamingE = streamingExecutor
-- |
-- A short form alias to 'streamingTransaction'.
streamingT :: Backend b => RowParser b r => ReadingPrivilege l => S b -> T b l s (ListT (T b l s) r)
streamingT = streamingTransaction
-- |
-- A short form alias to 'Transaction'.
type T =
type W =
type R =
type TListT =
type S b =
Backend.Statement b
@ -6,22 +6,45 @@ import HighSQL.Prelude
import qualified Language.Haskell.TH as TH
data TransactionError =
-- |
-- The transaction failed and should be retried.
TransactionConflict |
data BackendError =
-- -- |
-- -- The transaction failed and should be retried.
-- TransactionConflict |
-- |
-- The connection got interrupted.
Disconnected Text
ConnectionLost Text
deriving (Show, Typeable)
instance Exception TransactionError
instance Exception BackendError
-- |
-- For reference see
-- <https://en.wikipedia.org/wiki/Isolation_(database_systems)#Isolation_levels the Wikipedia info>.
data IsolationLevel =
Serializable |
RepeatableReads |
ReadCommitted |
-- |
-- An isolation level and a boolean,
-- defining, whether the transaction will perform the "write" operations.
type TransactionMode =
(IsolationLevel, Bool)
-- |
-- A width of a row and a stream of serialized values.
type ResultsStream =
(Int, ListT IO ByteString)
type ResultsStream b =
(Int, ListT IO (Result b))
-- |
-- A template statement with values for placeholders.
type Statement b =
(ByteString, [StatementArgument b])
class Backend b where
@ -39,29 +62,32 @@ class Backend b where
-- Close the connection.
disconnect :: Connection b -> IO ()
-- |
-- Execute a statement with values for placeholders.
execute :: ByteString -> [StatementArgument b] -> Connection b -> IO ()
-- Execute a statement.
execute :: Statement b -> Connection b -> IO ()
-- |
-- Execute a statement with values for placeholders
-- and an expected results stream size.
-- The expected stream size can be used by the backend to determine
-- an optimal fetching method.
executeStreaming :: ByteString -> [StatementArgument b] -> Maybe Integer -> Connection b -> IO (Int, ListT IO (Result b))
-- Execute a statement
-- and stream the results.
executeAndStream :: Statement b -> Connection b -> IO (ResultsStream b)
-- |
-- Execute a statement with values for placeholders,
-- Execute a statement
-- and stream the results using a cursor.
-- This function will only be used from inside of transactions.
executeAndStreamWithCursor :: Statement b -> Connection b -> IO (ResultsStream b)
-- |
-- Execute a statement,
-- returning the amount of affected rows.
executeCountingEffects :: ByteString -> [StatementArgument b] -> Connection b -> IO Integer
-- |
-- Start a transaction in an atomic mode if the first flag is true
-- and in a write mode if the second one is true.
beginTransaction :: Bool -> Bool -> Connection b -> IO ()
-- |
-- Finish the transaction,
-- while releasing all the resources acquired with 'executeStreaming'.
-- The boolean defines whether to commit the updates,
-- otherwise it rolls back.
finishTransaction :: Bool -> Connection b -> IO ()
executeAndCountEffects :: Statement b -> Connection b -> IO Integer
-- -- |
-- -- Start a transaction in the specified mode.
-- beginTransaction :: TransactionMode -> Connection b -> IO ()
-- -- |
-- -- Finish the transaction,
-- -- while releasing all the resources acquired with 'executeAndStream'.
-- --
-- -- The boolean defines whether to commit the updates,
-- -- otherwise it rolls back.
-- finishTransaction :: Bool -> Connection b -> IO ()
inTransaction :: TransactionMode -> IO r -> Connection b -> IO r
class Backend b => Mapping b v where
@ -69,3 +95,4 @@ class Backend b => Mapping b v where
parseResult :: Result b -> Maybe v
@ -1,5 +1,5 @@
{-# LANGUAGE UndecidableInstances #-}
module HighSQL.Row where
module HighSQL.RowParser where
import HighSQL.Prelude
import Language.Haskell.TH
@ -7,14 +7,14 @@ import qualified Data.Text as Text
import qualified HighSQL.Backend as Backend
class Row b r where
parseResults :: [Backend.Result b] -> Maybe r
class RowParser b r where
parse :: [Backend.Result b] -> Maybe r
instance Row b () where
parseResults = \case [] -> Just (); _ -> Nothing
instance RowParser b () where
parse = \case [] -> Just (); _ -> Nothing
instance Backend.Mapping b v => Row b v where
parseResults = join . fmap (Backend.parseResult :: Backend.Result b -> Maybe v) . headMay
instance Backend.Mapping b v => RowParser b v where
parse = join . fmap (Backend.parseResult :: Backend.Result b -> Maybe v) . headMay
-- Generate tuple instaces using Template Haskell:
@ -31,9 +31,9 @@ let
constraints =
map (\t -> ClassP ''Backend.Mapping [backendType, t]) varTypes
head =
AppT (AppT (ConT ''Row) backendType) (foldl AppT (TupleT arity) varTypes)
AppT (AppT (ConT ''RowParser) backendType) (foldl AppT (TupleT arity) varTypes)
fromRowDec =
FunD 'parseResults [c1, c2]
FunD 'parse [c1, c2]
c1 =
Clause [ListP (map VarP varNames)] (NormalB e) []
