Switch from transformer-based resource management to IO due to potential concurrency issues

This commit is contained in:
Nikita Volkov 2014-08-05 14:18:57 +04:00
parent 268acdafdc
commit 399d5172ae

View File

@ -10,18 +10,46 @@ import qualified ListT
-- * Session
-------------------------
-- |
-- A session, in which transactions are executed.
-- It maintains a shared state between transactions.
newtype S m r =
S (ReaderT (Pool.Pool Backend.Connection) (EitherT SessionError m) r)
deriving (Functor, Applicative, Monad, MonadIO)
data Session =
Session !(Pool.Pool Backend.Connection)
-- |
-- Run the session monad transformer in the base monad.
session :: Backend.Backend -> S m r -> m r
session b s =
$notImplemented
-- Session settings.
data Settings =
Settings {
-- |
-- The number of stripes (distinct sub-pools) to maintain.
-- The smallest acceptable value is 1.
striping1 :: Word32,
-- |
-- Maximum number of connections to keep open per stripe.
-- The smallest acceptable value is 1.
-- Requests for connections will block if this limit is reached
-- on a single stripe,
-- even if other stripes have idle connections available.
striping2 :: Word32,
-- |
-- The amount of time for which an unused resource is kept open.
-- The smallest acceptable value is 0.5 seconds.
-- The elapsed time before destroying a resource
-- may be a little longer than requested,
-- as the reaper thread wakes at 1-second intervals.
connectionTimeout :: NominalDiffTime
}
withSession :: Backend.Backend -> Settings -> (Session -> IO a) -> IO a
withSession b s =
bracket acquire release
where
acquire =
do
pool <-
Pool.createPool
(Backend.connect b) (Backend.disconnect) (striping1 s)
(connectionTimeout s) (striping2 s)
return (Session pool)
release (Session pool) =
Pool.purgePool pool
-- ** Error
@ -29,6 +57,7 @@ session b s =
data SessionError =
TransactionError TransactionError
deriving (Show, Typeable)
-- * Transaction
@ -37,23 +66,33 @@ data SessionError =
-- |
-- A transaction with a level @l@ and a result @r@.
newtype T l s r =
T (ReaderT Backend.Connection (EitherT TransactionError IO) r)
T (CompositionT.T (ReaderT Backend.Connection (EitherT TransactionError IO)) r)
deriving (Functor, Applicative, Monad)
transaction :: MonadIO m => (forall s. T l s r) -> S m r
transaction t =
transaction :: Session -> (forall s. T l s r) -> IO r
transaction (Session p) t =
do
pool <- S $ ask
e <-
liftIO $ Pool.withResource pool $ \c ->
runEitherT $ runReaderT (case t of T r -> r) c
either (S . lift . left . TransactionError) return e
e <-
Pool.withResource p $ runEitherT . \c ->
case composed of
False ->
runReaderT r c
True ->
do
$notImplemented
either throwIO return e
where
(composed, r) = case t of T t -> CompositionT.run t
-- ** Error
-------------------------
data TransactionError
data TransactionError =
TE
deriving (Show, Typeable)
instance Exception TransactionError
-- * Levels
@ -61,19 +100,19 @@ data TransactionError
data Read
read :: MonadIO m => (forall s. T Read s r) -> S m r
read :: MonadIO m => Session -> (forall s. T Read s r) -> IO r
read = transaction
data Write
write :: MonadIO m => (forall s. T Write s r) -> S m r
write :: MonadIO m => Session -> (forall s. T Write s r) -> IO r
write = transaction
data Admin
admin :: MonadIO m => (forall s. T Admin s r) -> S m r
admin :: MonadIO m => Session -> (forall s. T Admin s r) -> IO r
admin = transaction
@ -98,7 +137,7 @@ class UpdatePrivilege l
instance UpdatePrivilege Write
update :: UpdatePrivilege l => Statement -> T l s ()
update :: UpdatePrivilege l => Statement -> T l s (Maybe Integer)
update =
$notImplemented