Add the SyncUnlift strategy (#170)

This commit is contained in:
Andrzej Rybczak 2023-06-24 23:42:25 +02:00 committed by GitHub
parent 300d5afb28
commit 998c65452f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 166 additions and 13 deletions

View File

@ -5,6 +5,9 @@
easier to understand API.
* Add support for turning an effect handler into an effectful operation via the
`Provider` effect.
* Add the `SyncUnlift` strategy for cases where running unlifted computations in
different threads is an implementation detail and concurrency is not
observable from outside.
# effectful-core-2.2.2.2 (2023-03-13)
* Allow `inject` to turn a monomorphic effect stack into a polymorphic one.

View File

@ -64,6 +64,7 @@ library
, exceptions >= 0.10.4
, monad-control >= 1.0.3
, primitive >= 0.7.3.0
, stm >= 2.5
, transformers-base >= 0.4.6
, unliftio-core >= 0.2.0.1

View File

@ -40,6 +40,7 @@ module Effectful
-- ** Unlifting
, UnliftStrategy(..)
, SyncPolicy(..)
, Persistence(..)
, Limit(..)
, unliftStrategy

View File

@ -499,6 +499,9 @@ localUnlift (LocalEnv les) strategy k = case strategy of
SeqUnlift -> unsafeEff $ \es -> do
seqUnliftIO les $ \unlift -> do
(`unEff` es) $ k $ unsafeEff_ . unlift
SyncUnlift p -> unsafeEff $ \es -> do
syncUnliftIO les p $ \unlift -> do
(`unEff` es) $ k $ unsafeEff_ . unlift
ConcUnlift p l -> unsafeEff $ \es -> do
concUnliftIO les p l $ \unlift -> do
(`unEff` es) $ k $ unsafeEff_ . unlift
@ -514,6 +517,7 @@ localUnliftIO
-> Eff es a
localUnliftIO (LocalEnv les) strategy k = case strategy of
SeqUnlift -> liftIO $ seqUnliftIO les k
SyncUnlift p -> liftIO $ syncUnliftIO les p k
ConcUnlift p l -> liftIO $ concUnliftIO les p l k
----------------------------------------
@ -553,6 +557,9 @@ localLift !_ strategy k = case strategy of
SeqUnlift -> unsafeEff $ \es -> do
seqUnliftIO es $ \unlift -> do
(`unEff` es) $ k $ unsafeEff_ . unlift
SyncUnlift p -> unsafeEff $ \es -> do
syncUnliftIO es p $ \unlift -> do
(`unEff` es) $ k $ unsafeEff_ . unlift
ConcUnlift p l -> unsafeEff $ \es -> do
concUnliftIO es p l $ \unlift -> do
(`unEff` es) $ k $ unsafeEff_ . unlift
@ -642,6 +649,10 @@ localLiftUnlift (LocalEnv les) strategy k = case strategy of
seqUnliftIO es $ \unliftEs -> do
seqUnliftIO les $ \unliftLocalEs -> do
(`unEff` es) $ k (unsafeEff_ . unliftEs) (unsafeEff_ . unliftLocalEs)
SyncUnlift p -> unsafeEff $ \es -> do
syncUnliftIO es p $ \unliftEs -> do
syncUnliftIO les p $ \unliftLocalEs -> do
(`unEff` es) $ k (unsafeEff_ . unliftEs) (unsafeEff_ . unliftLocalEs)
ConcUnlift p l -> unsafeEff $ \es -> do
concUnliftIO es p l $ \unliftEs -> do
concUnliftIO les p l $ \unliftLocalEs -> do
@ -665,6 +676,7 @@ localLiftUnliftIO
-> Eff es a
localLiftUnliftIO (LocalEnv les) strategy k = case strategy of
SeqUnlift -> liftIO $ seqUnliftIO les $ k unsafeEff_
SyncUnlift p -> liftIO $ syncUnliftIO les p $ k unsafeEff_
ConcUnlift p l -> liftIO $ concUnliftIO les p l $ k unsafeEff_
----------------------------------------

View File

@ -20,6 +20,7 @@ module Effectful.Internal.Env
, cloneEnv
, restoreEnv
, sizeEnv
, syncVarEnv
, tailEnv
-- ** Modification of the effect stack
@ -37,6 +38,7 @@ module Effectful.Internal.Env
, modifyEnv
) where
import Control.Concurrent.STM.TVar
import Control.Monad
import Control.Monad.Primitive
import Data.Primitive.PrimArray
@ -84,6 +86,7 @@ data Storage = Storage
, stVersions :: !(MutablePrimArray RealWorld Int)
, stEffects :: !(SmallMutableArray RealWorld Any)
, stRelinkers :: !(SmallMutableArray RealWorld Any)
, stSyncVar :: !(TVar Bool)
}
----------------------------------------
@ -132,7 +135,7 @@ emptyEnv = Env 0
-- | Clone the environment to use it in a different thread.
cloneEnv :: Env es -> IO (Env es)
cloneEnv (Env offset refs storage0) = do
Storage storageSize version vs0 es0 fs0 <- readIORef' storage0
Storage storageSize version vs0 es0 fs0 _ <- readIORef' storage0
let vsSize = sizeofMutablePrimArray vs0
esSize = sizeofSmallMutableArray es0
fsSize = sizeofSmallMutableArray fs0
@ -143,7 +146,8 @@ cloneEnv (Env offset refs storage0) = do
vs <- cloneMutablePrimArray vs0 0 vsSize
es <- cloneSmallMutableArray es0 0 esSize
fs <- cloneSmallMutableArray fs0 0 fsSize
storage <- newIORef' $ Storage storageSize version vs es fs
syncVar <- newTVarIO False
storage <- newIORef' $ Storage storageSize version vs es fs syncVar
let relinkEffects = \case
0 -> pure ()
k -> do
@ -184,6 +188,12 @@ sizeEnv :: Env es -> IO Int
sizeEnv (Env offset refs _) = do
pure $ (sizeofPrimArray refs - offset) `div` 2
-- | Get the variable for the synchronized unlift.
--
-- @since 2.3.0.0
syncVarEnv :: Env es -> IO (TVar Bool)
syncVarEnv env = stSyncVar <$> readIORef' (envStorage env)
-- | Access the tail of the environment.
tailEnv :: Env (e : es) -> IO (Env es)
tailEnv (Env offset refs storage) = do
@ -348,7 +358,7 @@ getLocation (Env offset refs storage) = do
let i = offset + 2 * reifyIndex @e @es
ref = indexPrimArray refs i
version = indexPrimArray refs (i + 1)
Storage _ _ vs es _ <- readIORef' storage
Storage _ _ vs es _ _ <- readIORef' storage
storageVersion <- readPrimArray vs ref
-- If version of the reference is different than version in the storage, it
-- means that the effect in the storage is not the one that was initially
@ -367,6 +377,7 @@ emptyStorage = Storage 0 (noVersion + 1)
<$> newPrimArray 0
<*> newSmallArray 0 undefinedData
<*> newSmallArray 0 undefinedData
<*> newTVarIO False
-- | Insert an effect into the storage and return its reference.
insertEffect
@ -376,7 +387,7 @@ insertEffect
-> Relinker (EffectRep (DispatchOf e)) e
-> IO (Int, Int)
insertEffect storage e f = do
Storage size version vs0 es0 fs0 <- readIORef' storage
Storage size version vs0 es0 fs0 syncVar <- readIORef' storage
let len0 = sizeofSmallMutableArray es0
case size `compare` len0 of
GT -> error $ "size (" ++ show size ++ ") > len0 (" ++ show len0 ++ ")"
@ -384,7 +395,7 @@ insertEffect storage e f = do
writePrimArray vs0 size version
writeSmallArray' es0 size (toAny e)
writeSmallArray' fs0 size (toAny f)
writeIORef' storage $ Storage (size + 1) (version + 1) vs0 es0 fs0
writeIORef' storage $ Storage (size + 1) (version + 1) vs0 es0 fs0 syncVar
pure (size, version)
EQ -> do
let len = doubleCapacity len0
@ -397,20 +408,20 @@ insertEffect storage e f = do
writePrimArray vs size version
writeSmallArray' es size (toAny e)
writeSmallArray' fs size (toAny f)
writeIORef' storage $ Storage (size + 1) (version + 1) vs es fs
writeIORef' storage $ Storage (size + 1) (version + 1) vs es fs syncVar
pure (size, version)
-- | Given a reference to an effect from the top of the stack, delete it from
-- the storage.
deleteEffect :: IORef' Storage -> Int -> IO ()
deleteEffect storage ref = do
Storage size version vs es fs <- readIORef' storage
Storage size version vs es fs syncVar <- readIORef' storage
when (ref /= size - 1) $ do
error $ "ref (" ++ show ref ++ ") /= size - 1 (" ++ show (size - 1) ++ ")"
writePrimArray vs ref noVersion
writeSmallArray es ref undefinedData
writeSmallArray fs ref undefinedData
writeIORef' storage $ Storage (size - 1) version vs es fs
writeIORef' storage $ Storage (size - 1) version vs es fs syncVar
-- | Relink the environment to use the new storage.
relinkEnv :: IORef' Storage -> Env es -> IO (Env es)

View File

@ -40,6 +40,7 @@ module Effectful.Internal.Monad
-- * Unlifting
, UnliftStrategy(..)
, SyncPolicy(..)
, Persistence(..)
, Limit(..)
, unliftStrategy
@ -50,6 +51,7 @@ module Effectful.Internal.Monad
-- ** Low-level unlifts
, seqUnliftIO
, syncUnliftIO
, concUnliftIO
-- * Dispatch
@ -202,6 +204,7 @@ withEffToIO
-> Eff es a
withEffToIO strategy f = case strategy of
SeqUnlift -> unsafeEff $ \es -> seqUnliftIO es f
SyncUnlift p -> unsafeEff $ \es -> syncUnliftIO es p f
ConcUnlift p b -> unsafeEff $ \es -> concUnliftIO es p b f
-- | Create an unlifting function with the 'ConcUnlift' strategy.
@ -228,6 +231,19 @@ seqUnliftIO
-> IO a
seqUnliftIO es k = seqUnlift k es unEff
-- | Create an unlifting function with the 'SyncUnlift' strategy.
--
-- @since 2.3.0.0
syncUnliftIO
:: HasCallStack
=> Env es
-- ^ The environment.
-> SyncPolicy
-> ((forall r. Eff es r -> IO r) -> IO a)
-- ^ Continuation with the unlifting function in scope.
-> IO a
syncUnliftIO es policy k = syncUnlift policy k es unEff
-- | Create an unlifting function with the 'ConcUnlift' strategy.
concUnliftIO
:: HasCallStack
@ -415,6 +431,10 @@ raiseWith strategy k = case strategy of
es <- tailEnv ees
seqUnliftIO ees $ \unlift -> do
(`unEff` es) $ k $ unsafeEff_ . unlift
SyncUnlift p -> unsafeEff $ \ees -> do
es <- tailEnv ees
syncUnliftIO ees p $ \unlift -> do
(`unEff` es) $ k $ unsafeEff_ . unlift
ConcUnlift p l -> unsafeEff $ \ees -> do
es <- tailEnv ees
concUnliftIO ees p l $ \unlift -> do

View File

@ -8,17 +8,19 @@
module Effectful.Internal.Unlift
( -- * Unlifting strategies
UnliftStrategy(..)
, SyncPolicy(..)
, Persistence(..)
, Limit(..)
-- * Unlifting functions
, seqUnlift
, syncUnlift
, concUnlift
, ephemeralConcUnlift
, persistentConcUnlift
) where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import GHC.Conc.Sync (ThreadId(..))
import GHC.Exts (mkWeak#, mkWeakNoFinalizer#)
@ -41,14 +43,41 @@ import Effectful.Internal.Utils
data UnliftStrategy
= SeqUnlift
-- ^ The fastest strategy and a default setting for t'Effectful.IOE'. An
-- attempt to call the unlifting function in threads distinct from its creator
-- attempt to call the unlifting function in thread distinct from its creator
-- will result in a runtime error.
| SyncUnlift !SyncPolicy
-- ^ Synchronized strategy is a middle ground between 'SeqUnlift' and
-- 'ConcUnlift'. It allows you to run the unlifting function in any thread as
-- long as only one unlifted computation runs at any given time.
--
-- Especially useful for cases where running unlifted computations in
-- different threads is an implementation detail and concurrency is not
-- observable from outside.
--
-- /Note:/ this strategy preserves changes made by unlifted computations to
-- thread local state.
--
-- 'SyncPolicy' determines what happens when you attempt to run an unlifted
-- computation while another one is already running.
--
-- @since 2.3.0.0
| ConcUnlift !Persistence !Limit
-- ^ A strategy that makes it possible for the unlifting function to be called
-- in threads distinct from its creator. See 'Persistence' and 'Limit'
-- settings for more information.
deriving (Eq, Generic, Ord, Show)
-- | Policy for the 'SyncUnlift' strategy when the unlifting function detects
-- that an unlifted computation is already running.
--
-- @since 2.3.0.0
data SyncPolicy
= SyncError
-- ^ Treat such case as an invariant violation and throw an error.
| SyncWait
-- ^ Wait until the computation finishes.
deriving (Eq, Generic, Ord, Show)
-- | Persistence setting for the 'ConcUnlift' strategy.
--
-- Different functions require different persistence strategies. Examples:
@ -106,9 +135,44 @@ seqUnlift k es unEff = do
then unEff m es
else error
$ "If you want to use the unlifting function to run Eff computations "
++ "in multiple threads, have a look at UnliftStrategy (ConcUnlift)."
++ "in multiple threads, have a look at the UnliftStrategy (SyncUnlift "
++ "or ConcUnlift)."
-- | Concurrent unlift for various strategies and limits.
-- | Synchronized unlift.
--
-- @since 2.3.0.0
syncUnlift
:: HasCallStack
=> SyncPolicy
-> ((forall r. m r -> IO r) -> IO a)
-> Env es
-> (forall r. m r -> Env es -> IO r)
-> IO a
syncUnlift policy k es unEff = do
-- Synchronization is tied to the storage of effects so that users don't
-- bypass it by creating multiple unlifting functions.
syncVar <- syncVarEnv es
activeVar <- newTVarIO True
let cleanUp = atomically $ do
inProgress <- readTVar syncVar
-- Wait for any unlifted computation to finish before exiting.
when inProgress retry
-- Prevent the unlifting function to be used out the scope.
writeTVar activeVar False
(`finally` cleanUp) $ k $ \m -> do
inlineBracket
(atomically $ do
active <- readTVar activeVar
unless active $ error "The unlifted function is no longer active"
inProgress <- swapTVar syncVar True
when inProgress $ case policy of
SyncError -> error "An unlifted computation is already running"
SyncWait -> retry
)
(\_ -> atomically $ writeTVar syncVar False)
(\_ -> unEff m es)
-- | Concurrent unlift.
concUnlift
:: HasCallStack
=> Persistence
@ -126,6 +190,9 @@ concUnlift Persistent (Limited threads) k =
concUnlift Persistent Unlimited k =
persistentConcUnlift True maxBound k
----------------------------------------
-- Internal
-- | Concurrent unlift that doesn't preserve the environment between calls to
-- the unlifting function in threads other than its creator.
ephemeralConcUnlift

View File

@ -5,6 +5,9 @@
easier to understand API.
* Add support for turning an effect handler into an effectful operation via the
`Provider` effect.
* Add the `SyncUnlift` strategy for cases where running unlifted computations in
different threads is an implementation detail and concurrency is not
observable from outside.
# effectful-2.2.2.0 (2023-01-11)
* Add `withSeqEffToIO` and `withConcEffToIO` to `Effectful`.

View File

@ -1,17 +1,25 @@
module UnliftTests (unliftTests) where
import Control.Concurrent
import Control.Exception
import Data.Functor
import Test.Tasty
import Test.Tasty.HUnit
import qualified UnliftIO.Async as A
import Effectful
import Effectful.State.Dynamic
import qualified Utils as U
unliftTests :: TestTree
unliftTests = testGroup "Unlift"
[ testCase "Strategy stays the same in a new thread" test_threadStrategy
, testCase "SeqUnlift in new thread" test_seqUnliftInNewThread
, testGroup "SyncUnlift"
[ testCase "SyncError works" test_syncErrorWorks
, testCase "SyncError throws when appropriate" test_syncErrorThrows
, testCase "SyncWait works" test_syncWaitWorks
]
, testGroup "Ephemeral strategy"
[ testCase "Invalid limit" test_ephemeralInvalid
, testCase "Uses in same thread" test_ephemeralSameThread
@ -38,6 +46,33 @@ test_seqUnliftInNewThread = runEff $ do
withEffToIO SeqUnlift $ \runInIO -> do
inThread $ runInIO $ return ()
test_syncErrorWorks :: Assertion
test_syncErrorWorks = runEff . evalStateLocal @Int 1 $ do
modifyFromAsync (*2)
modifyFromAsync (*3)
U.assertEqual "correct state" 6 =<< get @Int
where
modifyFromAsync f = withEffToIO (SyncUnlift SyncError) $ \runInIO -> do
void . A.async . runInIO $ liftIO (threadDelay 10000) >> modify @Int f
-- Wait for the async action to start, but try exiting the scope of
-- withEffToIO before the unlifted computation had a chance to finish to
-- check that it waits until it does.
threadDelay 1000
test_syncErrorThrows :: Assertion
test_syncErrorThrows = runEff $ do
assertThrowsUnliftError "Sync error" $ do
withEffToIO (SyncUnlift SyncError) $ \runInIO -> do
A.race_ (runInIO . liftIO $ threadDelay 10000)
(runInIO . liftIO $ threadDelay 10000)
test_syncWaitWorks :: Assertion
test_syncWaitWorks = runEff . evalStateLocal @Int 0 $ do
withEffToIO (SyncUnlift SyncWait) $ \runInIO -> do
-- SyncUnlift SyncWait turns concurrent code into sequential code.
A.replicateConcurrently_ 100 $ runInIO $ modify @Int (+1)
U.assertEqual "correct state" 100 =<< get @Int
test_ephemeralInvalid :: Assertion
test_ephemeralInvalid = runEff $ do
assertThrowsUnliftError "InvalidNumberOfUses error" $ do