diff --git a/effectful-core/CHANGELOG.md b/effectful-core/CHANGELOG.md index f9f3d13..b00b675 100644 --- a/effectful-core/CHANGELOG.md +++ b/effectful-core/CHANGELOG.md @@ -5,6 +5,9 @@ easier to understand API. * Add support for turning an effect handler into an effectful operation via the `Provider` effect. +* Add the `SyncUnlift` strategy for cases where running unlifted computations in + different threads is an implementation detail and concurrency is not + observable from outside. # effectful-core-2.2.2.2 (2023-03-13) * Allow `inject` to turn a monomorphic effect stack into a polymorphic one. diff --git a/effectful-core/effectful-core.cabal b/effectful-core/effectful-core.cabal index 8764122..4769cb3 100644 --- a/effectful-core/effectful-core.cabal +++ b/effectful-core/effectful-core.cabal @@ -64,6 +64,7 @@ library , exceptions >= 0.10.4 , monad-control >= 1.0.3 , primitive >= 0.7.3.0 + , stm >= 2.5 , transformers-base >= 0.4.6 , unliftio-core >= 0.2.0.1 diff --git a/effectful-core/src/Effectful.hs b/effectful-core/src/Effectful.hs index 67b3b65..ab25da7 100644 --- a/effectful-core/src/Effectful.hs +++ b/effectful-core/src/Effectful.hs @@ -40,6 +40,7 @@ module Effectful -- ** Unlifting , UnliftStrategy(..) + , SyncPolicy(..) , Persistence(..) , Limit(..) , unliftStrategy diff --git a/effectful-core/src/Effectful/Dispatch/Dynamic.hs b/effectful-core/src/Effectful/Dispatch/Dynamic.hs index b33c42c..4a3cb49 100644 --- a/effectful-core/src/Effectful/Dispatch/Dynamic.hs +++ b/effectful-core/src/Effectful/Dispatch/Dynamic.hs @@ -499,6 +499,9 @@ localUnlift (LocalEnv les) strategy k = case strategy of SeqUnlift -> unsafeEff $ \es -> do seqUnliftIO les $ \unlift -> do (`unEff` es) $ k $ unsafeEff_ . unlift + SyncUnlift p -> unsafeEff $ \es -> do + syncUnliftIO les p $ \unlift -> do + (`unEff` es) $ k $ unsafeEff_ . unlift ConcUnlift p l -> unsafeEff $ \es -> do concUnliftIO les p l $ \unlift -> do (`unEff` es) $ k $ unsafeEff_ . unlift @@ -514,6 +517,7 @@ localUnliftIO -> Eff es a localUnliftIO (LocalEnv les) strategy k = case strategy of SeqUnlift -> liftIO $ seqUnliftIO les k + SyncUnlift p -> liftIO $ syncUnliftIO les p k ConcUnlift p l -> liftIO $ concUnliftIO les p l k ---------------------------------------- @@ -553,6 +557,9 @@ localLift !_ strategy k = case strategy of SeqUnlift -> unsafeEff $ \es -> do seqUnliftIO es $ \unlift -> do (`unEff` es) $ k $ unsafeEff_ . unlift + SyncUnlift p -> unsafeEff $ \es -> do + syncUnliftIO es p $ \unlift -> do + (`unEff` es) $ k $ unsafeEff_ . unlift ConcUnlift p l -> unsafeEff $ \es -> do concUnliftIO es p l $ \unlift -> do (`unEff` es) $ k $ unsafeEff_ . unlift @@ -642,6 +649,10 @@ localLiftUnlift (LocalEnv les) strategy k = case strategy of seqUnliftIO es $ \unliftEs -> do seqUnliftIO les $ \unliftLocalEs -> do (`unEff` es) $ k (unsafeEff_ . unliftEs) (unsafeEff_ . unliftLocalEs) + SyncUnlift p -> unsafeEff $ \es -> do + syncUnliftIO es p $ \unliftEs -> do + syncUnliftIO les p $ \unliftLocalEs -> do + (`unEff` es) $ k (unsafeEff_ . unliftEs) (unsafeEff_ . unliftLocalEs) ConcUnlift p l -> unsafeEff $ \es -> do concUnliftIO es p l $ \unliftEs -> do concUnliftIO les p l $ \unliftLocalEs -> do @@ -665,6 +676,7 @@ localLiftUnliftIO -> Eff es a localLiftUnliftIO (LocalEnv les) strategy k = case strategy of SeqUnlift -> liftIO $ seqUnliftIO les $ k unsafeEff_ + SyncUnlift p -> liftIO $ syncUnliftIO les p $ k unsafeEff_ ConcUnlift p l -> liftIO $ concUnliftIO les p l $ k unsafeEff_ ---------------------------------------- diff --git a/effectful-core/src/Effectful/Internal/Env.hs b/effectful-core/src/Effectful/Internal/Env.hs index 4841ff7..23092dc 100644 --- a/effectful-core/src/Effectful/Internal/Env.hs +++ b/effectful-core/src/Effectful/Internal/Env.hs @@ -20,6 +20,7 @@ module Effectful.Internal.Env , cloneEnv , restoreEnv , sizeEnv + , syncVarEnv , tailEnv -- ** Modification of the effect stack @@ -37,6 +38,7 @@ module Effectful.Internal.Env , modifyEnv ) where +import Control.Concurrent.STM.TVar import Control.Monad import Control.Monad.Primitive import Data.Primitive.PrimArray @@ -84,6 +86,7 @@ data Storage = Storage , stVersions :: !(MutablePrimArray RealWorld Int) , stEffects :: !(SmallMutableArray RealWorld Any) , stRelinkers :: !(SmallMutableArray RealWorld Any) + , stSyncVar :: !(TVar Bool) } ---------------------------------------- @@ -132,7 +135,7 @@ emptyEnv = Env 0 -- | Clone the environment to use it in a different thread. cloneEnv :: Env es -> IO (Env es) cloneEnv (Env offset refs storage0) = do - Storage storageSize version vs0 es0 fs0 <- readIORef' storage0 + Storage storageSize version vs0 es0 fs0 _ <- readIORef' storage0 let vsSize = sizeofMutablePrimArray vs0 esSize = sizeofSmallMutableArray es0 fsSize = sizeofSmallMutableArray fs0 @@ -143,7 +146,8 @@ cloneEnv (Env offset refs storage0) = do vs <- cloneMutablePrimArray vs0 0 vsSize es <- cloneSmallMutableArray es0 0 esSize fs <- cloneSmallMutableArray fs0 0 fsSize - storage <- newIORef' $ Storage storageSize version vs es fs + syncVar <- newTVarIO False + storage <- newIORef' $ Storage storageSize version vs es fs syncVar let relinkEffects = \case 0 -> pure () k -> do @@ -184,6 +188,12 @@ sizeEnv :: Env es -> IO Int sizeEnv (Env offset refs _) = do pure $ (sizeofPrimArray refs - offset) `div` 2 +-- | Get the variable for the synchronized unlift. +-- +-- @since 2.3.0.0 +syncVarEnv :: Env es -> IO (TVar Bool) +syncVarEnv env = stSyncVar <$> readIORef' (envStorage env) + -- | Access the tail of the environment. tailEnv :: Env (e : es) -> IO (Env es) tailEnv (Env offset refs storage) = do @@ -348,7 +358,7 @@ getLocation (Env offset refs storage) = do let i = offset + 2 * reifyIndex @e @es ref = indexPrimArray refs i version = indexPrimArray refs (i + 1) - Storage _ _ vs es _ <- readIORef' storage + Storage _ _ vs es _ _ <- readIORef' storage storageVersion <- readPrimArray vs ref -- If version of the reference is different than version in the storage, it -- means that the effect in the storage is not the one that was initially @@ -367,6 +377,7 @@ emptyStorage = Storage 0 (noVersion + 1) <$> newPrimArray 0 <*> newSmallArray 0 undefinedData <*> newSmallArray 0 undefinedData + <*> newTVarIO False -- | Insert an effect into the storage and return its reference. insertEffect @@ -376,7 +387,7 @@ insertEffect -> Relinker (EffectRep (DispatchOf e)) e -> IO (Int, Int) insertEffect storage e f = do - Storage size version vs0 es0 fs0 <- readIORef' storage + Storage size version vs0 es0 fs0 syncVar <- readIORef' storage let len0 = sizeofSmallMutableArray es0 case size `compare` len0 of GT -> error $ "size (" ++ show size ++ ") > len0 (" ++ show len0 ++ ")" @@ -384,7 +395,7 @@ insertEffect storage e f = do writePrimArray vs0 size version writeSmallArray' es0 size (toAny e) writeSmallArray' fs0 size (toAny f) - writeIORef' storage $ Storage (size + 1) (version + 1) vs0 es0 fs0 + writeIORef' storage $ Storage (size + 1) (version + 1) vs0 es0 fs0 syncVar pure (size, version) EQ -> do let len = doubleCapacity len0 @@ -397,20 +408,20 @@ insertEffect storage e f = do writePrimArray vs size version writeSmallArray' es size (toAny e) writeSmallArray' fs size (toAny f) - writeIORef' storage $ Storage (size + 1) (version + 1) vs es fs + writeIORef' storage $ Storage (size + 1) (version + 1) vs es fs syncVar pure (size, version) -- | Given a reference to an effect from the top of the stack, delete it from -- the storage. deleteEffect :: IORef' Storage -> Int -> IO () deleteEffect storage ref = do - Storage size version vs es fs <- readIORef' storage + Storage size version vs es fs syncVar <- readIORef' storage when (ref /= size - 1) $ do error $ "ref (" ++ show ref ++ ") /= size - 1 (" ++ show (size - 1) ++ ")" writePrimArray vs ref noVersion writeSmallArray es ref undefinedData writeSmallArray fs ref undefinedData - writeIORef' storage $ Storage (size - 1) version vs es fs + writeIORef' storage $ Storage (size - 1) version vs es fs syncVar -- | Relink the environment to use the new storage. relinkEnv :: IORef' Storage -> Env es -> IO (Env es) diff --git a/effectful-core/src/Effectful/Internal/Monad.hs b/effectful-core/src/Effectful/Internal/Monad.hs index 869c9a3..0a2851e 100644 --- a/effectful-core/src/Effectful/Internal/Monad.hs +++ b/effectful-core/src/Effectful/Internal/Monad.hs @@ -40,6 +40,7 @@ module Effectful.Internal.Monad -- * Unlifting , UnliftStrategy(..) + , SyncPolicy(..) , Persistence(..) , Limit(..) , unliftStrategy @@ -50,6 +51,7 @@ module Effectful.Internal.Monad -- ** Low-level unlifts , seqUnliftIO + , syncUnliftIO , concUnliftIO -- * Dispatch @@ -202,6 +204,7 @@ withEffToIO -> Eff es a withEffToIO strategy f = case strategy of SeqUnlift -> unsafeEff $ \es -> seqUnliftIO es f + SyncUnlift p -> unsafeEff $ \es -> syncUnliftIO es p f ConcUnlift p b -> unsafeEff $ \es -> concUnliftIO es p b f -- | Create an unlifting function with the 'ConcUnlift' strategy. @@ -228,6 +231,19 @@ seqUnliftIO -> IO a seqUnliftIO es k = seqUnlift k es unEff +-- | Create an unlifting function with the 'SyncUnlift' strategy. +-- +-- @since 2.3.0.0 +syncUnliftIO + :: HasCallStack + => Env es + -- ^ The environment. + -> SyncPolicy + -> ((forall r. Eff es r -> IO r) -> IO a) + -- ^ Continuation with the unlifting function in scope. + -> IO a +syncUnliftIO es policy k = syncUnlift policy k es unEff + -- | Create an unlifting function with the 'ConcUnlift' strategy. concUnliftIO :: HasCallStack @@ -415,6 +431,10 @@ raiseWith strategy k = case strategy of es <- tailEnv ees seqUnliftIO ees $ \unlift -> do (`unEff` es) $ k $ unsafeEff_ . unlift + SyncUnlift p -> unsafeEff $ \ees -> do + es <- tailEnv ees + syncUnliftIO ees p $ \unlift -> do + (`unEff` es) $ k $ unsafeEff_ . unlift ConcUnlift p l -> unsafeEff $ \ees -> do es <- tailEnv ees concUnliftIO ees p l $ \unlift -> do diff --git a/effectful-core/src/Effectful/Internal/Unlift.hs b/effectful-core/src/Effectful/Internal/Unlift.hs index 9d14840..553b1df 100644 --- a/effectful-core/src/Effectful/Internal/Unlift.hs +++ b/effectful-core/src/Effectful/Internal/Unlift.hs @@ -8,17 +8,19 @@ module Effectful.Internal.Unlift ( -- * Unlifting strategies UnliftStrategy(..) + , SyncPolicy(..) , Persistence(..) , Limit(..) -- * Unlifting functions , seqUnlift + , syncUnlift , concUnlift - , ephemeralConcUnlift - , persistentConcUnlift ) where import Control.Concurrent +import Control.Concurrent.STM +import Control.Exception import Control.Monad import GHC.Conc.Sync (ThreadId(..)) import GHC.Exts (mkWeak#, mkWeakNoFinalizer#) @@ -41,14 +43,41 @@ import Effectful.Internal.Utils data UnliftStrategy = SeqUnlift -- ^ The fastest strategy and a default setting for t'Effectful.IOE'. An - -- attempt to call the unlifting function in threads distinct from its creator + -- attempt to call the unlifting function in thread distinct from its creator -- will result in a runtime error. + | SyncUnlift !SyncPolicy + -- ^ Synchronized strategy is a middle ground between 'SeqUnlift' and + -- 'ConcUnlift'. It allows you to run the unlifting function in any thread as + -- long as only one unlifted computation runs at any given time. + -- + -- Especially useful for cases where running unlifted computations in + -- different threads is an implementation detail and concurrency is not + -- observable from outside. + -- + -- /Note:/ this strategy preserves changes made by unlifted computations to + -- thread local state. + -- + -- 'SyncPolicy' determines what happens when you attempt to run an unlifted + -- computation while another one is already running. + -- + -- @since 2.3.0.0 | ConcUnlift !Persistence !Limit -- ^ A strategy that makes it possible for the unlifting function to be called -- in threads distinct from its creator. See 'Persistence' and 'Limit' -- settings for more information. deriving (Eq, Generic, Ord, Show) +-- | Policy for the 'SyncUnlift' strategy when the unlifting function detects +-- that an unlifted computation is already running. +-- +-- @since 2.3.0.0 +data SyncPolicy + = SyncError + -- ^ Treat such case as an invariant violation and throw an error. + | SyncWait + -- ^ Wait until the computation finishes. + deriving (Eq, Generic, Ord, Show) + -- | Persistence setting for the 'ConcUnlift' strategy. -- -- Different functions require different persistence strategies. Examples: @@ -106,9 +135,44 @@ seqUnlift k es unEff = do then unEff m es else error $ "If you want to use the unlifting function to run Eff computations " - ++ "in multiple threads, have a look at UnliftStrategy (ConcUnlift)." + ++ "in multiple threads, have a look at the UnliftStrategy (SyncUnlift " + ++ "or ConcUnlift)." --- | Concurrent unlift for various strategies and limits. +-- | Synchronized unlift. +-- +-- @since 2.3.0.0 +syncUnlift + :: HasCallStack + => SyncPolicy + -> ((forall r. m r -> IO r) -> IO a) + -> Env es + -> (forall r. m r -> Env es -> IO r) + -> IO a +syncUnlift policy k es unEff = do + -- Synchronization is tied to the storage of effects so that users don't + -- bypass it by creating multiple unlifting functions. + syncVar <- syncVarEnv es + activeVar <- newTVarIO True + let cleanUp = atomically $ do + inProgress <- readTVar syncVar + -- Wait for any unlifted computation to finish before exiting. + when inProgress retry + -- Prevent the unlifting function to be used out the scope. + writeTVar activeVar False + (`finally` cleanUp) $ k $ \m -> do + inlineBracket + (atomically $ do + active <- readTVar activeVar + unless active $ error "The unlifted function is no longer active" + inProgress <- swapTVar syncVar True + when inProgress $ case policy of + SyncError -> error "An unlifted computation is already running" + SyncWait -> retry + ) + (\_ -> atomically $ writeTVar syncVar False) + (\_ -> unEff m es) + +-- | Concurrent unlift. concUnlift :: HasCallStack => Persistence @@ -126,6 +190,9 @@ concUnlift Persistent (Limited threads) k = concUnlift Persistent Unlimited k = persistentConcUnlift True maxBound k +---------------------------------------- +-- Internal + -- | Concurrent unlift that doesn't preserve the environment between calls to -- the unlifting function in threads other than its creator. ephemeralConcUnlift diff --git a/effectful/CHANGELOG.md b/effectful/CHANGELOG.md index f53026e..bde06bb 100644 --- a/effectful/CHANGELOG.md +++ b/effectful/CHANGELOG.md @@ -5,6 +5,9 @@ easier to understand API. * Add support for turning an effect handler into an effectful operation via the `Provider` effect. +* Add the `SyncUnlift` strategy for cases where running unlifted computations in + different threads is an implementation detail and concurrency is not + observable from outside. # effectful-2.2.2.0 (2023-01-11) * Add `withSeqEffToIO` and `withConcEffToIO` to `Effectful`. diff --git a/effectful/tests/UnliftTests.hs b/effectful/tests/UnliftTests.hs index 0a9e110..d70555e 100644 --- a/effectful/tests/UnliftTests.hs +++ b/effectful/tests/UnliftTests.hs @@ -1,17 +1,25 @@ module UnliftTests (unliftTests) where +import Control.Concurrent import Control.Exception +import Data.Functor import Test.Tasty import Test.Tasty.HUnit import qualified UnliftIO.Async as A import Effectful +import Effectful.State.Dynamic import qualified Utils as U unliftTests :: TestTree unliftTests = testGroup "Unlift" [ testCase "Strategy stays the same in a new thread" test_threadStrategy , testCase "SeqUnlift in new thread" test_seqUnliftInNewThread + , testGroup "SyncUnlift" + [ testCase "SyncError works" test_syncErrorWorks + , testCase "SyncError throws when appropriate" test_syncErrorThrows + , testCase "SyncWait works" test_syncWaitWorks + ] , testGroup "Ephemeral strategy" [ testCase "Invalid limit" test_ephemeralInvalid , testCase "Uses in same thread" test_ephemeralSameThread @@ -38,6 +46,33 @@ test_seqUnliftInNewThread = runEff $ do withEffToIO SeqUnlift $ \runInIO -> do inThread $ runInIO $ return () +test_syncErrorWorks :: Assertion +test_syncErrorWorks = runEff . evalStateLocal @Int 1 $ do + modifyFromAsync (*2) + modifyFromAsync (*3) + U.assertEqual "correct state" 6 =<< get @Int + where + modifyFromAsync f = withEffToIO (SyncUnlift SyncError) $ \runInIO -> do + void . A.async . runInIO $ liftIO (threadDelay 10000) >> modify @Int f + -- Wait for the async action to start, but try exiting the scope of + -- withEffToIO before the unlifted computation had a chance to finish to + -- check that it waits until it does. + threadDelay 1000 + +test_syncErrorThrows :: Assertion +test_syncErrorThrows = runEff $ do + assertThrowsUnliftError "Sync error" $ do + withEffToIO (SyncUnlift SyncError) $ \runInIO -> do + A.race_ (runInIO . liftIO $ threadDelay 10000) + (runInIO . liftIO $ threadDelay 10000) + +test_syncWaitWorks :: Assertion +test_syncWaitWorks = runEff . evalStateLocal @Int 0 $ do + withEffToIO (SyncUnlift SyncWait) $ \runInIO -> do + -- SyncUnlift SyncWait turns concurrent code into sequential code. + A.replicateConcurrently_ 100 $ runInIO $ modify @Int (+1) + U.assertEqual "correct state" 100 =<< get @Int + test_ephemeralInvalid :: Assertion test_ephemeralInvalid = runEff $ do assertThrowsUnliftError "InvalidNumberOfUses error" $ do