mirror of
https://github.com/sayo-hs/heftia.git
synced 2024-11-22 18:36:15 +03:00
[WIP] composable streaming pipe using first-order async effects (though this idea might not be good...)
This commit is contained in:
parent
81f628a701
commit
5fa7ac28a4
@ -13,7 +13,7 @@ allow-newer: eff:primitive
|
||||
source-repository-package
|
||||
type: git
|
||||
location: https://github.com/sayo-hs/data-effects
|
||||
tag: 9fd125aecb96c7d56db7c464abd31056bc17c921
|
||||
tag: 1fbb3f73e7ba4e9e7e69cd09f563416cab032fb5
|
||||
subdir: data-effects-core
|
||||
subdir: data-effects-th
|
||||
subdir: data-effects
|
||||
|
@ -10,14 +10,16 @@ import Control.Monad.Hefty (
|
||||
Eff,
|
||||
interpret,
|
||||
liftIO,
|
||||
raise,
|
||||
(&),
|
||||
type (<:),
|
||||
type (<<|),
|
||||
type (<|),
|
||||
type (~>),
|
||||
)
|
||||
import Control.Monad.Hefty.Concurrent.Parallel (runParallelIO)
|
||||
import Control.Monad.Hefty.Concurrent.Async (runAsyncIO, runAsyncSeq)
|
||||
import Control.Monad.Hefty.Concurrent.Stream (connect)
|
||||
import Control.Monad.Hefty.Concurrent.Timer (Timer, runTimerIO, sleep)
|
||||
import Control.Monad.Hefty.Except (runThrow, throw)
|
||||
import Control.Monad.Hefty.Input (Input, input)
|
||||
import Control.Monad.Hefty.Output (Output, output)
|
||||
@ -49,24 +51,37 @@ This function is equivalent to the following (as a result of reducing 'runThrow'
|
||||
|
||||
@
|
||||
produce = void do
|
||||
for_ [1 .. 2] \(i :: Int) -> do
|
||||
for_ [1 .. 4] \(i :: Int) -> do
|
||||
output i
|
||||
sleep 0.5
|
||||
@
|
||||
-}
|
||||
produce :: (Output Int <| ef) => Eff '[] ef ()
|
||||
produce = void . runThrow @() $ do
|
||||
for_ [1 .. 3] \(i :: Int) -> do
|
||||
when (i >= 3) $ throw ()
|
||||
output i
|
||||
produce :: (Output Int <| ef, Timer <| ef) => Int -> Eff '[] ef ()
|
||||
produce n = void . runThrow @() $ do
|
||||
for_ [1 ..] \(i :: Int) -> do
|
||||
when (i == 5) $ throw ()
|
||||
output $ n + i
|
||||
|
||||
-- sleep 0.5
|
||||
|
||||
consume :: (Input Int <: m, MonadIO m) => m ()
|
||||
consume = forever do
|
||||
liftIO . print =<< input @Int
|
||||
|
||||
plus100 :: (Input Int <: m, Output Int <: m, MonadIO m) => m ()
|
||||
plus100 = forever do
|
||||
i <- input @Int
|
||||
liftIO $ print i
|
||||
output $ i + 100
|
||||
|
||||
main :: IO ()
|
||||
main = runUnliftIO . runResourceIO . runParallelIO $ do
|
||||
_ <- runSomeResource $ connect @Int produce consume
|
||||
_ <- runSomeResource $ connect @Int produce consume
|
||||
main = runUnliftIO . runTimerIO . runResourceIO $ do
|
||||
-- _ <- runSomeResource $ runAsyncIO $ connect @Int (produce 0) consume
|
||||
-- _ <- runSomeResource $ runAsyncIO $ connect @Int (connect @Int (produce 0) plus100) consume
|
||||
_ <-
|
||||
runAsyncIO do
|
||||
let m = connect @Int (raise (produce 1000)) consume
|
||||
connect @Int m consume
|
||||
pure ()
|
||||
|
||||
{-
|
||||
|
@ -1,33 +0,0 @@
|
||||
{-# OPTIONS_GHC -fplugin GHC.TypeLits.KnownNat.Solver #-}
|
||||
|
||||
-- SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
module BenchParallel where
|
||||
|
||||
import Control.Monad (liftM2)
|
||||
import Control.Monad.Hefty (Eff, type (<:), type (<|))
|
||||
import Control.Monad.Hefty.Concurrent.Parallel (runParallelIO)
|
||||
import Control.Monad.Hefty.Concurrent.Stream (closing, connect)
|
||||
import Control.Monad.Hefty.Input (Input, input)
|
||||
import Control.Monad.Hefty.Output (Output, output)
|
||||
import Control.Monad.Hefty.Unlift (runUnliftIO)
|
||||
import Control.Monad.IO.Class (MonadIO)
|
||||
import Data.Foldable (for_)
|
||||
import Data.Maybe (fromJust)
|
||||
import Data.These.Combinators (justThese)
|
||||
import System.IO.Unsafe (unsafePerformIO)
|
||||
|
||||
produce :: (Output Int <| ef) => Int -> Eff '[] ef ()
|
||||
produce n =
|
||||
for_ [1 .. n] \(i :: Int) -> do
|
||||
output i
|
||||
|
||||
consume :: (Input Int <: m, MonadIO m) => Int -> m [Int]
|
||||
consume 0 = pure []
|
||||
consume n = liftM2 (:) (input @Int) (consume (n - 1))
|
||||
|
||||
parallel :: Int -> [Int]
|
||||
parallel n = unsafePerformIO . runUnliftIO . runParallelIO $ do
|
||||
stat <- connect @Int (produce n) (consume n)
|
||||
pure $ snd . fromJust $ justThese $ closing stat
|
||||
{-# NOINLINE parallel #-}
|
@ -7,7 +7,6 @@ import BenchCatch
|
||||
import BenchCoroutine
|
||||
import BenchCountdown
|
||||
import BenchLocal
|
||||
import BenchParallel
|
||||
import BenchPyth
|
||||
import Data.Functor ((<&>))
|
||||
import Test.Tasty.Bench
|
||||
@ -141,9 +140,4 @@ main =
|
||||
, bench "eff.5+5" $ nf coroutineEffDeep x
|
||||
, bench "mp.5+5" $ nf coroutineMpDeep x
|
||||
]
|
||||
, bgroup "parallel" $
|
||||
[10000] <&> \x ->
|
||||
bgroup
|
||||
(show x)
|
||||
[bench "parallel" $ nf parallel x]
|
||||
]
|
||||
|
@ -64,6 +64,7 @@ common common-base
|
||||
containers > 0.6.5 && < 0.8,
|
||||
these,
|
||||
co-log-core,
|
||||
random,
|
||||
|
||||
ghc-options: -Wall
|
||||
|
||||
@ -87,6 +88,7 @@ library
|
||||
Control.Monad.Hefty.Fresh
|
||||
Control.Monad.Hefty.Fail
|
||||
Control.Monad.Hefty.Concurrent.Parallel
|
||||
Control.Monad.Hefty.Concurrent.Async
|
||||
Control.Monad.Hefty.Concurrent.Stream
|
||||
Control.Monad.Hefty.Concurrent.Timer
|
||||
Control.Monad.Hefty.Log
|
||||
@ -121,6 +123,7 @@ library
|
||||
Data.Effect.Fresh,
|
||||
Data.Effect.Fail,
|
||||
Data.Effect.Concurrent.Parallel,
|
||||
Data.Effect.Concurrent.Async,
|
||||
Data.Effect.Concurrent.Timer,
|
||||
Data.Effect.Log,
|
||||
|
||||
@ -142,6 +145,7 @@ test-suite test
|
||||
Test.Writer
|
||||
Test.Pyth
|
||||
Test.Coroutine
|
||||
Test.Async
|
||||
|
||||
hs-source-dirs: test
|
||||
|
||||
@ -272,4 +276,3 @@ benchmark heftia-bench
|
||||
BenchLocal
|
||||
BenchCoroutine
|
||||
BenchPyth
|
||||
BenchParallel
|
||||
|
164
heftia-effects/src/Control/Monad/Hefty/Concurrent/Async.hs
Normal file
164
heftia-effects/src/Control/Monad/Hefty/Concurrent/Async.hs
Normal file
@ -0,0 +1,164 @@
|
||||
{-# OPTIONS_GHC -fplugin GHC.TypeLits.KnownNat.Solver #-}
|
||||
|
||||
-- SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
module Control.Monad.Hefty.Concurrent.Async (
|
||||
module Control.Monad.Hefty.Concurrent.Async,
|
||||
module Data.Effect.Concurrent.Async,
|
||||
module Data.Effect.NonDet,
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Arrow ((>>>))
|
||||
import Control.Exception (AsyncException (ThreadKilled), BlockedIndefinitelyOnSTM (BlockedIndefinitelyOnSTM), SomeException, fromException)
|
||||
import Control.Monad (void)
|
||||
import Control.Monad.Hefty (
|
||||
Eff,
|
||||
MemberBy,
|
||||
bundleN,
|
||||
interpretH,
|
||||
nil,
|
||||
unkey,
|
||||
(!+),
|
||||
(&),
|
||||
type (+),
|
||||
type (<<|),
|
||||
type (<|),
|
||||
type (~>),
|
||||
)
|
||||
import Control.Monad.Hefty.Interpret (reinterpretWith)
|
||||
import Control.Monad.Hefty.Unlift (UnliftIO)
|
||||
import Data.Effect.Concurrent.Async
|
||||
import Data.Effect.Concurrent.Parallel (Parallel)
|
||||
import Data.Effect.NonDet
|
||||
import Data.Effect.Unlift (withRunInIO)
|
||||
import Data.Functor.Const (Const (Const))
|
||||
import Data.Functor.Identity (Identity (Identity))
|
||||
import Data.Set qualified as Set
|
||||
import Data.Void (Void, absurd)
|
||||
import Data.Word (Word8)
|
||||
import Debug.Trace (trace)
|
||||
import GHC.Conc (retry)
|
||||
import GHC.Generics (type (:+:) (L1, R1))
|
||||
import System.Random (randomIO)
|
||||
import UnliftIO (
|
||||
atomically,
|
||||
catch,
|
||||
catchJust,
|
||||
finally,
|
||||
liftIO,
|
||||
mask,
|
||||
modifyTVar,
|
||||
newEmptyTMVarIO,
|
||||
newTVarIO,
|
||||
putTMVar,
|
||||
readTMVar,
|
||||
readTVarIO,
|
||||
throwIO,
|
||||
tryReadTMVar,
|
||||
uninterruptibleMask_,
|
||||
)
|
||||
import UnliftIO.Concurrent (ThreadId, forkIO, killThread)
|
||||
|
||||
type HasAsync ef f = MemberBy AsyncKey (Async' f) ef
|
||||
|
||||
runAsyncSeq :: Eff '[] (Async (Const (Eff eh ef ans) + Identity) ': ef) ans -> Eff eh ef ans
|
||||
runAsyncSeq =
|
||||
unkey >>> reinterpretWith \case
|
||||
Fork -> \k -> k $ Left \x ->
|
||||
L1 . Const . k . Right $
|
||||
Future
|
||||
(R1 $ Identity x)
|
||||
(R1 $ Identity $ Just x)
|
||||
(R1 $ Identity ())
|
||||
Perform (L1 (Const k)) -> const k
|
||||
Perform (R1 (Identity x)) -> ($ x)
|
||||
|
||||
runAsyncIO :: (IO <| ef, UnliftIO <<| eh) => Eff '[] (Async (Const (IO Void) + IO) ': ef) ~> Eff eh ef
|
||||
runAsyncIO m = do
|
||||
zombieThreads <- newTVarIO Set.empty
|
||||
|
||||
r <-
|
||||
m
|
||||
& unkey
|
||||
& reinterpretWith \case
|
||||
Fork -> \k ->
|
||||
withRunInIO \run -> do
|
||||
chan <- newEmptyTMVarIO
|
||||
|
||||
mask \restore -> do
|
||||
t <-
|
||||
forkIO do
|
||||
uid <- randomIO @Word8
|
||||
liftIO $ putStrLn $ "Start thread " <> show uid
|
||||
catchJust
|
||||
(\e -> if fromException e == Just ThreadKilled then Nothing else Just e)
|
||||
( void . restore . run $ k $ Left \s ->
|
||||
L1 $ Const $ do
|
||||
liftIO $ putStrLn $ "Performed write on " <> show uid
|
||||
atomically $ putTMVar chan . Right $ s
|
||||
atomically retry
|
||||
)
|
||||
(atomically . putTMVar chan . Left)
|
||||
|
||||
liftIO $ putStrLn $ "START " <> show t
|
||||
|
||||
atomically $ modifyTVar zombieThreads $ Set.insert t
|
||||
|
||||
let kill = do
|
||||
liftIO $ putStrLn $ "KILL " <> show t
|
||||
killThread t
|
||||
atomically (modifyTVar zombieThreads $ Set.delete t)
|
||||
|
||||
restore . run . k . Right $
|
||||
Future
|
||||
( R1 $
|
||||
traceSTMBlock t (atomically (readTMVar chan)) `finally` kill >>= \case
|
||||
Right r -> pure r
|
||||
Left (e :: SomeException) -> do
|
||||
throwIO e
|
||||
)
|
||||
(R1 $ atomically $ (either (const Nothing) Just =<<) <$> tryReadTMVar chan)
|
||||
(R1 kill)
|
||||
Perform (L1 (Const send)) -> const $ liftIO $ absurd <$> send
|
||||
Perform (R1 a) -> (liftIO a >>=)
|
||||
|
||||
mapM_
|
||||
( \t -> do
|
||||
liftIO $ putStrLn $ "KILL ZOMBIE " <> show t
|
||||
killThread t
|
||||
)
|
||||
=<< readTVarIO zombieThreads
|
||||
|
||||
pure r
|
||||
|
||||
traceSTMBlock :: ThreadId -> IO a -> IO a
|
||||
traceSTMBlock t m = do
|
||||
putStrLn $ "read " <> show t
|
||||
m `catch` \BlockedIndefinitelyOnSTM -> throwIO $ userError $ "STM blocked for read async thread " <> show t
|
||||
|
||||
runParallelAsync :: (HasAsync ef f) => Eff (Parallel ': eh) ef ~> Eff eh ef
|
||||
runParallelAsync = interpretH parallelToAsync
|
||||
|
||||
runNonDetRaceIO
|
||||
:: (IO <| ef, UnliftIO <<| eh)
|
||||
=> Eff '[] (Choose ': Empty ': ef) ~> Eff eh ef
|
||||
runNonDetRaceIO =
|
||||
bundleN @2
|
||||
>>> reinterpretWith
|
||||
( ( \Choose k -> withRunInIO \run -> do
|
||||
var <- newEmptyTMVarIO
|
||||
mask \restore -> do
|
||||
let runThread b = forkIO $ do
|
||||
x <- restore $ run $ k b
|
||||
atomically $ putTMVar var x
|
||||
|
||||
t1 <- runThread False
|
||||
t2 <- runThread True
|
||||
|
||||
atomically (readTMVar var)
|
||||
<* uninterruptibleMask_ (killThread t1 *> killThread t2)
|
||||
)
|
||||
!+ (\Empty _ -> liftIO $ atomically retry)
|
||||
!+ nil
|
||||
)
|
@ -2,8 +2,17 @@
|
||||
|
||||
module Control.Monad.Hefty.Concurrent.Stream where
|
||||
|
||||
import Control.Monad.Hefty (Eff, interpretBy, raiseAllH, (&), type (<<|))
|
||||
import Control.Monad.Hefty.Concurrent.Parallel (Parallel, liftP2)
|
||||
import Control.Monad.Hefty (
|
||||
Eff,
|
||||
interpretBy,
|
||||
raiseAllH,
|
||||
unkey,
|
||||
untag,
|
||||
(&),
|
||||
type (#),
|
||||
type (#>),
|
||||
)
|
||||
import Control.Monad.Hefty.Concurrent.Async (HasAsync, liftAsync2)
|
||||
import Control.Monad.Hefty.Coroutine (Status (Continue, Done))
|
||||
import Control.Monad.Hefty.Input (Input (Input))
|
||||
import Control.Monad.Hefty.Output (Output (Output))
|
||||
@ -11,8 +20,8 @@ import Data.Bifunctor (Bifunctor, bimap)
|
||||
import Data.These (These (That, These, This))
|
||||
|
||||
connect
|
||||
:: forall v a b eh ef
|
||||
. (Parallel <<| eh)
|
||||
:: forall v a b eh ef f
|
||||
. (HasAsync ef f)
|
||||
=> Eff '[] (Output v ': ef) a
|
||||
-> Eff '[] (Input v ': ef) b
|
||||
-> Eff eh ef (StreamStatus (Eff '[] ef) v a b)
|
||||
@ -21,14 +30,32 @@ connect a b =
|
||||
(a & interpretBy (pure . Done) \(Output v) k -> pure $ Continue v k)
|
||||
(b & interpretBy (pure . Done) \Input k -> pure $ Continue () k)
|
||||
|
||||
connect'
|
||||
:: forall o i v a b eh ef f
|
||||
. (HasAsync ef f)
|
||||
=> Eff '[] (Output v # o ': ef) a
|
||||
-> Eff '[] (Input v # i ': ef) b
|
||||
-> Eff eh ef (StreamStatus (Eff '[] ef) v a b)
|
||||
connect' a b = connect (untag a) (untag b)
|
||||
{-# INLINE connect' #-}
|
||||
|
||||
connect''
|
||||
:: forall o i v a b eh ef f
|
||||
. (HasAsync ef f)
|
||||
=> Eff '[] (o #> Output v ': ef) a
|
||||
-> Eff '[] (i #> Input v ': ef) b
|
||||
-> Eff eh ef (StreamStatus (Eff '[] ef) v a b)
|
||||
connect'' a b = connect (unkey a) (unkey b)
|
||||
{-# INLINE connect'' #-}
|
||||
|
||||
runStream
|
||||
:: forall v a b eh ef
|
||||
. (Parallel <<| eh)
|
||||
:: forall v a b eh ef f
|
||||
. (HasAsync ef f)
|
||||
=> Eff '[] ef (Status (Eff '[] ef) v () a)
|
||||
-> Eff '[] ef (Status (Eff '[] ef) () v b)
|
||||
-> Eff eh ef (StreamStatus (Eff '[] ef) v a b)
|
||||
runStream a b = do
|
||||
(a', b') <- liftP2 (,) (raiseAllH a) (raiseAllH b)
|
||||
(a', b') <- liftAsync2 (,) (raiseAllH a) (raiseAllH b)
|
||||
|
||||
case (a', b') of
|
||||
(Done x, Done y) -> pure $ Equilibrium x y
|
||||
|
@ -23,7 +23,7 @@ import Control.Monad.Hefty (
|
||||
)
|
||||
import Control.Monad.Hefty.Coroutine (runCoroutine)
|
||||
import Control.Monad.Hefty.State (evalState)
|
||||
import Data.Effect.Concurrent.Timer (CyclicTimer (Wait), Timer (Clock, Sleep), clock, cyclicTimer)
|
||||
import Data.Effect.Concurrent.Timer
|
||||
import Data.Effect.Coroutine (Status (Continue, Done))
|
||||
import Data.Effect.State (get, put)
|
||||
import Data.Time (DiffTime)
|
||||
|
@ -115,3 +115,5 @@ branch a b = do
|
||||
world <- choose
|
||||
bool a b world
|
||||
{-# INLINE branch #-}
|
||||
|
||||
infixl 3 `branch`
|
||||
|
@ -10,11 +10,16 @@ where
|
||||
|
||||
import Control.Monad.Hefty (
|
||||
Eff,
|
||||
MemberHBy,
|
||||
interpret,
|
||||
interpretBy,
|
||||
interpretH,
|
||||
interpretHBy,
|
||||
interpretRecHWith,
|
||||
raiseH,
|
||||
runEff,
|
||||
send0,
|
||||
sendH,
|
||||
type (~>),
|
||||
)
|
||||
import Data.Effect.Key (KeyH (KeyH))
|
||||
@ -33,13 +38,27 @@ runShift :: (a -> Eff '[] ef ans) -> Eff '[ShiftFix ans '[] ef] ef a -> Eff '[]
|
||||
runShift f =
|
||||
interpretHBy f \e k ->
|
||||
evalShift $ case e of
|
||||
KeyH (Shift g) -> unShiftBase $ g (ShiftBase . raiseH . k) ShiftBase
|
||||
KeyH (Shift initiate) -> unShiftBase $ initiate (ShiftBase . raiseH . k) ShiftBase
|
||||
|
||||
withShift :: Eff '[ShiftFix ans '[] '[Eff eh ef]] '[Eff eh ef] ans -> Eff eh ef ans
|
||||
withShift = runEff . evalShift
|
||||
|
||||
runShift_ :: forall r ef. Eff (Shift_ (Eff r ef) ': r) ef ~> Eff r ef
|
||||
runShift_ = interpretRecHWith \(KeyH (Shift_' f)) k -> f k id
|
||||
runShift_ :: forall eh ef. Eff (Shift_ (Eff eh ef) ': eh) ef ~> Eff eh ef
|
||||
runShift_ = interpretRecHWith \(KeyH (Shift_' initiate)) k -> initiate k id
|
||||
|
||||
runReset :: forall r ef. Eff (Reset ': r) ef ~> Eff r ef
|
||||
runReset :: forall eh ef. Eff (Reset ': eh) ef ~> Eff eh ef
|
||||
runReset = interpretH \(Reset a) -> a
|
||||
|
||||
runShiftF :: Eff '[] (ShiftF (Eff '[] ef ans) ': ef) ans -> Eff '[] ef ans
|
||||
runShiftF = interpretBy pure \(ShiftF initiate) resume -> initiate resume
|
||||
|
||||
runShiftEff :: (Monad n) => (a -> n ans) -> Eff '[] '[ShiftF (n ans), n] a -> n ans
|
||||
runShiftEff f =
|
||||
runEff
|
||||
. interpretBy (send0 . f) \(ShiftF initiate) resume ->
|
||||
send0 $ initiate $ runEff . resume
|
||||
|
||||
runShiftAsF
|
||||
:: (MemberHBy ShiftKey (Shift' ans n) eh)
|
||||
=> Eff eh (ShiftF (n ans) ': ef) ~> Eff eh ef
|
||||
runShiftAsF = interpret $ sendH . fromShiftF
|
||||
|
95
heftia-effects/test/Test/Async.hs
Normal file
95
heftia-effects/test/Test/Async.hs
Normal file
@ -0,0 +1,95 @@
|
||||
{-# OPTIONS_GHC -fplugin GHC.TypeLits.KnownNat.Solver #-}
|
||||
|
||||
-- SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
module Test.Async where
|
||||
|
||||
import Control.Monad.Hefty (Eff, liftIO, type (<|))
|
||||
import Control.Monad.Hefty.Concurrent.Async (
|
||||
Future (await, cancel, poll),
|
||||
HasAsync,
|
||||
async,
|
||||
perform,
|
||||
runAsyncIO,
|
||||
runAsyncSeq,
|
||||
)
|
||||
import Control.Monad.Hefty.Concurrent.Parallel (halt, runHaltIO)
|
||||
import Control.Monad.Hefty.Concurrent.Timer (runTimerIO)
|
||||
import Control.Monad.Hefty.State (get, modify, runStateIORef)
|
||||
import Control.Monad.Hefty.Unlift (runUnliftIO)
|
||||
import Data.Effect.Concurrent.Timer (sleep)
|
||||
import Data.Functor ((<&>))
|
||||
import Test.Hspec (Spec, it, shouldBe, shouldReturn, shouldThrow)
|
||||
import UnliftIO (throwIO)
|
||||
|
||||
spec_Async :: Spec
|
||||
spec_Async = do
|
||||
let
|
||||
prog :: (HasAsync ef f, IO <| ef) => Eff eh ef (String, String)
|
||||
prog = runTimerIO . runStateIORef "" $ do
|
||||
fu <- async do
|
||||
sleep 0.001
|
||||
modify (<> "B")
|
||||
get @String <&> (<> "C")
|
||||
modify (<> "A")
|
||||
perform $ await fu
|
||||
|
||||
it "sequential" do
|
||||
(s, a) <- runUnliftIO . runAsyncSeq $ prog
|
||||
s `shouldBe` "BA"
|
||||
a `shouldBe` "BC"
|
||||
|
||||
it "await" do
|
||||
(s, a) <- runUnliftIO . runAsyncIO $ prog
|
||||
s `shouldBe` "AB"
|
||||
a `shouldBe` "ABC"
|
||||
|
||||
it "cancel" do
|
||||
(s, ()) <- runUnliftIO . runTimerIO . runStateIORef "" . runAsyncIO $ do
|
||||
fu <- async do
|
||||
sleep 0.001
|
||||
modify (<> "B")
|
||||
modify (<> "A")
|
||||
perform $ cancel fu
|
||||
sleep 0.002
|
||||
s `shouldBe` "A"
|
||||
|
||||
it "poll" do
|
||||
(s, ()) <- runUnliftIO . runTimerIO . runStateIORef "" . runAsyncIO $ do
|
||||
fu <- async do
|
||||
sleep 0.001
|
||||
modify (<> "B")
|
||||
get @String <&> (<> "C")
|
||||
modify (<> "A")
|
||||
|
||||
x <- perform $ poll fu
|
||||
liftIO $ x `shouldBe` Nothing
|
||||
|
||||
sleep 0.002
|
||||
|
||||
x' <- perform $ poll fu
|
||||
liftIO $ x' `shouldBe` Just "ABC"
|
||||
|
||||
s `shouldBe` "AB"
|
||||
|
||||
it "catch IO exception" $
|
||||
do
|
||||
runUnliftIO . runAsyncIO $ do
|
||||
fu <- async $ throwIO $ userError "Test IO Exception"
|
||||
perform $ await fu
|
||||
`shouldThrow` (== userError "Test IO Exception")
|
||||
|
||||
it "suppress IO exception" $
|
||||
do
|
||||
runUnliftIO . runTimerIO . runAsyncIO $ do
|
||||
_ <- async $ throwIO $ userError "Test IO Exception"
|
||||
sleep 0.001
|
||||
pure ()
|
||||
`shouldReturn` ()
|
||||
|
||||
it "early exit" $
|
||||
do
|
||||
runUnliftIO . runAsyncIO . runHaltIO $ do
|
||||
_ <- async halt
|
||||
pure ()
|
||||
`shouldReturn` ()
|
@ -506,9 +506,10 @@ module Control.Monad.Hefty (
|
||||
-- *** For first-order effects
|
||||
reinterpret,
|
||||
reinterpretN,
|
||||
reinterpretNWith,
|
||||
reinterpretBy,
|
||||
reinterpretNBy,
|
||||
reinterpretWith,
|
||||
reinterpretNWith,
|
||||
reinterpretRecWith,
|
||||
reinterpretRecNWith,
|
||||
|
||||
@ -647,6 +648,8 @@ module Control.Monad.Hefty (
|
||||
liftIO,
|
||||
module Data.Effect.OpenUnion,
|
||||
module Data.Effect,
|
||||
module Data.Effect.Tag,
|
||||
module Data.Effect.Key,
|
||||
module Data.Effect.TH,
|
||||
module Data.Effect.HFunctor.TH,
|
||||
module Data.Effect.Key.TH,
|
||||
@ -690,6 +693,7 @@ import Control.Monad.Hefty.Interpret (
|
||||
reinterpretRecNHWith,
|
||||
reinterpretRecNWith,
|
||||
reinterpretRecWith,
|
||||
reinterpretWith,
|
||||
runEff,
|
||||
runPure,
|
||||
stateless,
|
||||
@ -794,8 +798,10 @@ import Control.Monad.IO.Class (liftIO)
|
||||
import Data.Effect
|
||||
import Data.Effect.HFunctor (HFunctor)
|
||||
import Data.Effect.HFunctor.TH
|
||||
import Data.Effect.Key
|
||||
import Data.Effect.Key.TH
|
||||
import Data.Effect.OpenUnion
|
||||
import Data.Effect.OpenUnion.Sum (type (:+:))
|
||||
import Data.Effect.TH
|
||||
import Data.Effect.Tag
|
||||
import Data.Kind (Type)
|
||||
|
@ -188,40 +188,40 @@ reinterpretN f = reinterpretRecNWith @n (stateless f)
|
||||
{-# INLINE reinterpretN #-}
|
||||
|
||||
reinterpretWith
|
||||
:: forall e ef' ef a
|
||||
:: forall e ef' ef eh a
|
||||
. (ef `IsSuffixOf` ef')
|
||||
=> Interpreter e (Eff '[] ef') a
|
||||
=> Interpreter e (Eff eh ef') a
|
||||
-> Eff '[] (e ': ef) a
|
||||
-> Eff '[] ef' a
|
||||
-> Eff eh ef' a
|
||||
reinterpretWith = reinterpretBy pure
|
||||
{-# INLINE reinterpretWith #-}
|
||||
|
||||
reinterpretNWith
|
||||
:: forall n e ef' ef a
|
||||
:: forall n e ef' ef eh a
|
||||
. (WeakenN n ef ef')
|
||||
=> Interpreter e (Eff '[] ef') a
|
||||
=> Interpreter e (Eff eh ef') a
|
||||
-> Eff '[] (e ': ef) a
|
||||
-> Eff '[] ef' a
|
||||
-> Eff eh ef' a
|
||||
reinterpretNWith = reinterpretNBy @n pure
|
||||
{-# INLINE reinterpretNWith #-}
|
||||
|
||||
reinterpretBy
|
||||
:: forall e ef' ef ans a
|
||||
:: forall e ef' ef eh ans a
|
||||
. (ef `IsSuffixOf` ef')
|
||||
=> (a -> Eff '[] ef' ans)
|
||||
-> Interpreter e (Eff '[] ef') ans
|
||||
=> (a -> Eff eh ef' ans)
|
||||
-> Interpreter e (Eff eh ef') ans
|
||||
-> Eff '[] (e ': ef) a
|
||||
-> Eff '[] ef' ans
|
||||
-> Eff eh ef' ans
|
||||
reinterpretBy ret hdl = iterAllEffHFBy ret nilH (hdl !+ flip sendUnionBy . weakens)
|
||||
{-# INLINE reinterpretBy #-}
|
||||
|
||||
reinterpretNBy
|
||||
:: forall n e ef' ef ans a
|
||||
:: forall n e ef' ef eh ans a
|
||||
. (WeakenN n ef ef')
|
||||
=> (a -> Eff '[] ef' ans)
|
||||
-> Interpreter e (Eff '[] ef') ans
|
||||
=> (a -> Eff eh ef' ans)
|
||||
-> Interpreter e (Eff eh ef') ans
|
||||
-> Eff '[] (e ': ef) a
|
||||
-> Eff '[] ef' ans
|
||||
-> Eff eh ef' ans
|
||||
reinterpretNBy ret hdl = iterAllEffHFBy ret nilH (hdl !+ flip sendUnionBy . weakenN @n)
|
||||
{-# INLINE reinterpretNBy #-}
|
||||
|
||||
@ -373,14 +373,14 @@ interposeWith = interposeBy pure
|
||||
If multiple instances of @e@ exist in the list, the one closest to the head (with the smallest index) will be targeted.
|
||||
-}
|
||||
interposeBy
|
||||
:: forall e ef ans a
|
||||
:: forall e ef eh ans a
|
||||
. (e <| ef)
|
||||
=> (a -> Eff '[] ef ans)
|
||||
=> (a -> Eff eh ef ans)
|
||||
-- ^ Value handler
|
||||
-> Interpreter e (Eff '[] ef) ans
|
||||
-> Interpreter e (Eff eh ef) ans
|
||||
-- ^ Effect handler
|
||||
-> Eff '[] ef a
|
||||
-> Eff '[] ef ans
|
||||
-> Eff eh ef ans
|
||||
interposeBy ret f = iterAllEffHFBy ret nilH \u -> maybe (`sendUnionBy` u) f (prj @e u)
|
||||
{-# INLINE interposeBy #-}
|
||||
|
||||
|
@ -15,6 +15,7 @@ Description : Open unions (type-indexed co-products) for extensible effects.
|
||||
-}
|
||||
module Data.Effect.OpenUnion.Internal where
|
||||
|
||||
import Data.Effect.Key (type (##>), type (#>))
|
||||
import Data.Proxy (Proxy (Proxy))
|
||||
import Data.Type.Equality (type (==))
|
||||
import GHC.TypeError (ErrorMessage (ShowType, Text, (:$$:), (:<>:)), TypeError)
|
||||
@ -68,13 +69,23 @@ lexi-lambda/freer-simple#3, which describes the motivation in more detail.
|
||||
-}
|
||||
instance {-# INCOHERENT #-} IfNotFound e r w
|
||||
|
||||
type LookupError (key :: kk) (w :: [ke]) =
|
||||
TypeError
|
||||
class IfKeyNotFound (key :: k) (r :: [e]) (w :: [e])
|
||||
|
||||
instance
|
||||
( TypeError
|
||||
( 'Text "The key ‘"
|
||||
':<>: 'ShowType key
|
||||
':<>: 'Text "’ does not exist in the type-level list"
|
||||
':$$: 'Text " ‘" ':<>: 'ShowType w ':<>: 'Text "’"
|
||||
)
|
||||
)
|
||||
=> IfKeyNotFound key '[] w
|
||||
|
||||
instance IfKeyNotFound key (key #> e ': r) w
|
||||
instance IfKeyNotFound key (key ##> e ': r) w
|
||||
instance {-# OVERLAPPABLE #-} (IfKeyNotFound key r w) => IfKeyNotFound key (e ': r) w
|
||||
|
||||
instance {-# INCOHERENT #-} IfKeyNotFound e r w
|
||||
|
||||
infixr 5 ++
|
||||
type family xs ++ ys where
|
||||
|
@ -31,11 +31,11 @@ import Data.Effect.OpenUnion.Internal (
|
||||
ElemAt,
|
||||
ElemIndex,
|
||||
FindElem,
|
||||
IfKeyNotFound,
|
||||
IfNotFound,
|
||||
IsSuffixOf,
|
||||
KnownLength,
|
||||
Length,
|
||||
LookupError,
|
||||
P (unP),
|
||||
PrefixLength,
|
||||
Reverse,
|
||||
@ -153,14 +153,11 @@ instance (FindElem e es, IfNotFound e es es) => Member e es where
|
||||
infix 3 <|
|
||||
type (<|) = Member
|
||||
|
||||
type MemberBy key e es = (key #> e <| es, Lookup key es ~ key #> e)
|
||||
type MemberBy key e es = (key #> e <| es, Lookup key es ~ key #> e, IfKeyNotFound key es es)
|
||||
|
||||
type Lookup key es = Lookup_ key es es
|
||||
|
||||
type family Lookup_ (key :: k) r w :: EffectF where
|
||||
Lookup_ key (key #> e ': _) w = key #> e
|
||||
Lookup_ key (_ ': r) w = Lookup_ key r w
|
||||
Lookup_ key '[] w = LookupError key w
|
||||
type family Lookup (key :: k) r :: EffectF where
|
||||
Lookup key (key #> e ': _) = key #> e
|
||||
Lookup key (_ ': r) = Lookup key r
|
||||
|
||||
{- | Orthogonal decomposition of a @'Union' (e ': es) :: 'EffectF'@. 'Right' value
|
||||
is returned if the @'Union' (e ': es) :: 'EffectF'@ contains @e :: 'EffectF'@, and
|
||||
|
@ -36,11 +36,11 @@ import Data.Effect.OpenUnion.Internal (
|
||||
ElemAt,
|
||||
ElemIndex,
|
||||
FindElem,
|
||||
IfKeyNotFound,
|
||||
IfNotFound,
|
||||
IsSuffixOf,
|
||||
KnownLength,
|
||||
Length,
|
||||
LookupError,
|
||||
P (unP),
|
||||
PrefixLength,
|
||||
Reverse,
|
||||
@ -110,14 +110,11 @@ instance (FindElem e es, IfNotFound e es es) => MemberH e es where
|
||||
infix 3 <<|
|
||||
type (<<|) = MemberH
|
||||
|
||||
type MemberHBy key e es = (key ##> e <<| es, LookupH key es ~ key ##> e)
|
||||
type MemberHBy key e es = (key ##> e <<| es, LookupH key es ~ key ##> e, IfKeyNotFound key es es)
|
||||
|
||||
type LookupH key es = LookupH_ key es es
|
||||
|
||||
type family LookupH_ (key :: k) r w :: EffectH where
|
||||
LookupH_ key (key ##> e ': _) w = key ##> e
|
||||
LookupH_ key (_ ': r) w = LookupH_ key r w
|
||||
LookupH_ key '[] w = LookupError key w
|
||||
type family LookupH (key :: k) r :: EffectH where
|
||||
LookupH key (key ##> e ': _) = key ##> e
|
||||
LookupH key (_ ': r) = LookupH key r
|
||||
|
||||
decompH :: (HFunctor e) => UnionH (e ': es) f a -> Either (UnionH es f a) (e f a)
|
||||
decompH (UnionH 0 a koi) = Right $ hfmap koi $ unsafeCoerce a
|
||||
|
Loading…
Reference in New Issue
Block a user