mirror of
https://github.com/ilyakooo0/streamly.git
synced 2024-10-06 21:27:35 +03:00
Fix, cleanup and test discarding compositions
This commit is contained in:
parent
f5cfead136
commit
922c9ed018
@ -32,6 +32,11 @@ module Asyncly
|
||||
, each
|
||||
, gather
|
||||
|
||||
, discardAndThen
|
||||
, (*>>)
|
||||
, thenDiscard
|
||||
, (>>*)
|
||||
|
||||
, Log
|
||||
, Loggable
|
||||
, waitLogged
|
||||
|
@ -29,8 +29,11 @@ module Asyncly.AsyncT
|
||||
, async
|
||||
, makeAsync
|
||||
, each
|
||||
, (<**)
|
||||
, (**>)
|
||||
|
||||
, discardAndThen
|
||||
, (*>>)
|
||||
, thenDiscard
|
||||
, (>>*)
|
||||
|
||||
-- internal
|
||||
, dbg
|
||||
@ -488,13 +491,13 @@ each xs = foldl (<|>) empty $ map return xs
|
||||
-- | Runs a computation under a given thread limit. A limit of 0 means new
|
||||
-- tasks start synchronously in the current thread. New threads are created by
|
||||
-- 'parallel', and APIs that use parallel.
|
||||
threads :: MonadIO m => Int -> AsyncT m a -> AsyncT m a
|
||||
threads :: MonadAsync m => Int -> AsyncT m a -> AsyncT m a
|
||||
threads n process = AsyncT $ do
|
||||
oldCr <- gets threadCredit
|
||||
newCr <- liftIO $ newIORef n
|
||||
modify $ \s -> s { threadCredit = newCr }
|
||||
r <- runAsyncT $ process
|
||||
<** (AsyncT $ do
|
||||
>>* (AsyncT $ do
|
||||
modify $ \s -> s { threadCredit = oldCr }
|
||||
return (Just ())
|
||||
) -- restore old credit
|
||||
@ -588,10 +591,6 @@ instance (MonadBaseControl b m, MonadAsync m) => MonadBaseControl b (AsyncT m) w
|
||||
instance MonadAsync m => MonadThrow (AsyncT m) where
|
||||
throwM e = lift $ throwM e
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
-- More operators, instances
|
||||
------------------------------------------------------------------------------
|
||||
|
||||
instance (Num a, Monad (AsyncT m)) => Num (AsyncT m a) where
|
||||
fromInteger = return . fromInteger
|
||||
mf + mg = (+) <$> mf <*> mg
|
||||
@ -600,47 +599,18 @@ instance (Num a, Monad (AsyncT m)) => Num (AsyncT m a) where
|
||||
abs f = f >>= return . abs
|
||||
signum f = f >>= return . signum
|
||||
|
||||
{-
|
||||
-- | Warning: Radically untyped stuff. handle with care
|
||||
getContinuations :: Monad m => StateM m [a -> AsyncT m b]
|
||||
getContinuations = do
|
||||
EventF { fcomp = fs } <- get
|
||||
return $ unsafeCoerce fs
|
||||
------------------------------------------------------------------------------
|
||||
-- Special compositions
|
||||
------------------------------------------------------------------------------
|
||||
|
||||
-- | Save a closure and a continuation ('x' and 'f' in 'x >>= f').
|
||||
setContinuation :: Monad m
|
||||
=> AsyncT m a -> (a -> AsyncT m b) -> [c -> AsyncT m c] -> StateM m ()
|
||||
setContinuation b c fs = do
|
||||
modify $ \EventF{..} -> EventF { xcomp = b
|
||||
, fcomp = unsafeCoerce c : fs
|
||||
, .. }
|
||||
|
||||
-- | Restore the continuations to the provided ones.
|
||||
-- | NOTE: Events are also cleared out.
|
||||
restoreStack :: MonadState EventF m => t -> m ()
|
||||
restoreStack fs = modify $ \EventF {..} ->
|
||||
EventF { event = Nothing, fcomp = (unsafeCoerce fs), .. }
|
||||
|
||||
-}
|
||||
infixr 1 >>*, *>>
|
||||
|
||||
{-
|
||||
(<***) :: AsyncT m a -> AsyncT m b -> AsyncT m a
|
||||
(<***) ma mb =
|
||||
AsyncT $ do
|
||||
fs <- getContinuations
|
||||
setContinuation ma (\x -> mb >> return x) fs
|
||||
a <- runAsyncT ma
|
||||
runAsyncT mb
|
||||
restoreStack fs
|
||||
return a
|
||||
|
||||
infixr 1 <***, <**, **>
|
||||
|
||||
-- | Run @b@ once, discarding its result when the first task in task set @a@
|
||||
-- has finished. Useful to start a singleton task after the first task has been
|
||||
-- setup.
|
||||
(<|) :: MonadIO m => AsyncT m a -> AsyncT m b -> AsyncT m a
|
||||
(<|) ma mb = AsyncT $ do
|
||||
afterFirst :: MonadIO m => AsyncT m a -> AsyncT m b -> AsyncT m a
|
||||
afterFirst ma mb = AsyncT $ do
|
||||
fs <- getContinuations
|
||||
ref <- liftIO $ newIORef False
|
||||
setContinuation ma (cont ref) fs
|
||||
@ -655,19 +625,31 @@ infixr 1 <***, <**, **>
|
||||
runAsyncT mb
|
||||
return $ Just x
|
||||
|
||||
-}
|
||||
(<|) :: MonadIO m => AsyncT m a -> AsyncT m b -> AsyncT m a
|
||||
(<|) = afterFirst
|
||||
-}
|
||||
|
||||
infixr 1 <**, **>
|
||||
-- | Run 'm a' in "isolation" and discard its result, and then run 'm b' and
|
||||
-- return its result. Isolation means that any alternative actions inside 'm
|
||||
-- a' are not continued to 'm b'.
|
||||
discardAndThen :: MonadAsync m => AsyncT m a -> AsyncT m b -> AsyncT m b
|
||||
discardAndThen ma mb = AsyncT $ do
|
||||
_ <- runAsyncT (ma >> mzero)
|
||||
runAsyncT mb
|
||||
|
||||
-- | Run @m a@ discarding its result before running @m b@.
|
||||
(**>) :: Monad m => AsyncT m a -> AsyncT m b -> AsyncT m b
|
||||
(**>) x y = AsyncT $ do
|
||||
_ <- runAsyncT x
|
||||
runAsyncT y
|
||||
-- | Same as 'discardAndThen'.
|
||||
(*>>) :: MonadAsync m => AsyncT m a -> AsyncT m b -> AsyncT m b
|
||||
(*>>) = discardAndThen
|
||||
|
||||
-- | Run @m b@ discarding its result, after the whole task set @m a@ is done.
|
||||
(<**) :: Monad m => AsyncT m a -> AsyncT m b -> AsyncT m a
|
||||
(<**) ma mb = AsyncT $ do
|
||||
a <- runAsyncT ma
|
||||
_ <- runAsyncT mb
|
||||
return a
|
||||
-- | Run 'm a' and then run 'm b' in "isolation" and return the result of 'm
|
||||
-- a'. Isolation means that any alternative actions inside 'm a' are not
|
||||
-- continued to 'm b' and the results of 'm b' are discarded.
|
||||
thenDiscard :: MonadAsync m => AsyncT m a -> AsyncT m b -> AsyncT m a
|
||||
thenDiscard ma mb = AsyncT $ do
|
||||
a <- runAsyncT ma
|
||||
_ <- runAsyncT (mb >> mzero)
|
||||
return a
|
||||
|
||||
-- | Same as 'thenDiscard'.
|
||||
(>>*) :: MonadAsync m => AsyncT m a -> AsyncT m b -> AsyncT m a
|
||||
(>>*) = thenDiscard
|
||||
|
17
test/Main.hs
17
test/Main.hs
@ -84,6 +84,23 @@ main = hspec $ do
|
||||
>>= return . \x -> (length x, length (nub x)))
|
||||
`shouldReturn` ((3, 3) :: (Int, Int))
|
||||
|
||||
-- Both 0 and 1 must be printed on console
|
||||
it "*>> works as expected" $
|
||||
(wait $ (async (liftIO (putStrLn "0") >> return 0)
|
||||
<|> (liftIO (putStrLn "1") >> return 1))
|
||||
*>> (liftIO (putStrLn "2") >> return 2))
|
||||
`shouldReturn` ([2] :: [Int])
|
||||
|
||||
-- Both 0 and 1 must be printed on console
|
||||
it ">>* works as expected" $
|
||||
(wait $ (return 2
|
||||
>>* (async (liftIO (putStrLn "0") >> return 0)
|
||||
<|> (liftIO (putStrLn "1") >> return 1))
|
||||
)
|
||||
>>* return 2
|
||||
)
|
||||
`shouldReturn` ([2] :: [Int])
|
||||
|
||||
generalExample :: AsyncT IO Int
|
||||
generalExample = do
|
||||
liftIO $ return ()
|
||||
|
Loading…
Reference in New Issue
Block a user