mirror of
https://github.com/ilyakooo0/streamly.git
synced 2024-10-06 21:27:35 +03:00
Remove stale code, rename the type to AsyncT
This commit is contained in:
parent
b9603ccef0
commit
4e26aa558b
@ -89,13 +89,13 @@ transient_basic = T.keep' $ T.threads 0 $ do
|
||||
assert (Prelude.length xs == 499000) $
|
||||
T.exit (Prelude.length xs)
|
||||
|
||||
amap :: (Int -> Int) -> Int -> A.AsynclyT IO Int
|
||||
amap :: (Int -> Int) -> Int -> A.AsyncT IO Int
|
||||
amap = Main.map
|
||||
|
||||
afilter :: (Int -> Bool) -> Int -> A.AsynclyT IO Int
|
||||
afilter :: (Int -> Bool) -> Int -> A.AsyncT IO Int
|
||||
afilter = Main.filter
|
||||
|
||||
adrop :: Int -> Int -> A.AsynclyT IO Int
|
||||
adrop :: Int -> Int -> A.AsyncT IO Int
|
||||
adrop = Main.drop
|
||||
|
||||
asyncly_basic :: IO Int
|
||||
|
@ -1,14 +1,14 @@
|
||||
import Control.Monad.IO.Class (liftIO)
|
||||
import Data.Monoid ((<>))
|
||||
|
||||
import Asyncly (AsynclyT, runAsyncly)
|
||||
import Asyncly (AsyncT, runAsyncly)
|
||||
|
||||
input :: AsynclyT IO String
|
||||
input :: AsyncT IO String
|
||||
input = do
|
||||
string <- liftIO getLine
|
||||
return string <> input
|
||||
|
||||
output :: AsynclyT IO String -> AsynclyT IO ()
|
||||
output :: AsyncT IO String -> AsyncT IO ()
|
||||
output strings = do
|
||||
s <- strings
|
||||
liftIO $ putStrLn s
|
||||
|
@ -37,7 +37,7 @@ main = do
|
||||
|
||||
-- Generates a value and then loops. Can be used to generate an infinite
|
||||
-- stream. Interleaves the generator and the consumer.
|
||||
loopTail :: Int -> AsynclyT IO Int
|
||||
loopTail :: Int -> AsyncT IO Int
|
||||
loopTail x = do
|
||||
liftIO $ putStrLn "LoopTail..."
|
||||
return x <> (if x < 3 then loopTail (x + 1) else empty)
|
||||
@ -45,7 +45,7 @@ main = do
|
||||
-- Loops and then generates a value. The consumer can run only after the
|
||||
-- loop has finished. An infinite generator will not let the consumer run
|
||||
-- at all.
|
||||
loopHead :: Int -> AsynclyT IO Int
|
||||
loopHead :: Int -> AsyncT IO Int
|
||||
loopHead x = do
|
||||
liftIO $ putStrLn "LoopHead..."
|
||||
(if x < 3 then loopHead (x + 1) else empty) <> return x
|
||||
@ -59,12 +59,12 @@ main = do
|
||||
-- then the action on the right is also spawned concurrently. In other
|
||||
-- words, both actions may run concurrently based on the need.
|
||||
|
||||
loopTailA :: Int -> AsynclyT IO Int
|
||||
loopTailA :: Int -> AsyncT IO Int
|
||||
loopTailA x = do
|
||||
liftIO $ putStrLn "LoopTailA..."
|
||||
return x <|> (if x < 3 then loopTailA (x + 1) else empty)
|
||||
|
||||
loopHeadA :: Int -> AsynclyT IO Int
|
||||
loopHeadA :: Int -> AsyncT IO Int
|
||||
loopHeadA x = do
|
||||
liftIO $ putStrLn "LoopHeadA..."
|
||||
(if x < 3 then loopHeadA (x + 1) else empty) <|> return x
|
||||
|
@ -2,5 +2,5 @@ import Data.List (sum)
|
||||
import Asyncly
|
||||
|
||||
main = do
|
||||
xs <- toList $ for [1..100] $ \x -> return (x * x) :: AsynclyT IO Int
|
||||
xs <- toList $ for [1..100] $ \x -> return (x * x) :: AsyncT IO Int
|
||||
print . sum $ xs
|
||||
|
@ -120,7 +120,7 @@
|
||||
-- of the state machine. It is an immutable state machine!
|
||||
|
||||
module Asyncly
|
||||
( AsynclyT
|
||||
( AsyncT
|
||||
, MonadAsync
|
||||
|
||||
-- * Run
|
||||
|
@ -4,7 +4,7 @@
|
||||
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
|
||||
|
||||
-- |
|
||||
-- Module : Asyncly.Threads
|
||||
-- Module : Asyncly.RunAsync
|
||||
-- Copyright : (c) 2017 Harendra Kumar
|
||||
--
|
||||
-- License : MIT-style
|
||||
@ -13,8 +13,7 @@
|
||||
-- Portability : GHC
|
||||
--
|
||||
module Asyncly.RunAsync
|
||||
( AsynclyT
|
||||
, runAsyncly
|
||||
( runAsyncly
|
||||
, toList
|
||||
, each
|
||||
, for
|
||||
@ -37,46 +36,15 @@ import Control.Monad.Trans.Recorder (MonadRecorder(..), RecorderT,
|
||||
Recording, blank, runRecorderT)
|
||||
import Asyncly.AsyncT
|
||||
|
||||
-- This transformer runs AsyncT under a state to manage the threads.
|
||||
-- Separating the state from the pure ListT transformer is cleaner but it
|
||||
-- results in 2x performance degradation. At some point if that performance is
|
||||
-- really needed we can combine the two.
|
||||
|
||||
{-
|
||||
newtype AsynclyT m a = AsynclyT { runAsynclyT :: AsyncT (StateT Context m) a }
|
||||
|
||||
deriving instance Monad m => Functor (AsynclyT m)
|
||||
deriving instance Monad m => Applicative (AsynclyT m)
|
||||
deriving instance Monad m => Alternative (AsynclyT m)
|
||||
deriving instance Monad m => Monad (AsynclyT m)
|
||||
deriving instance MonadIO m => MonadIO (AsynclyT m)
|
||||
deriving instance MonadThrow m => MonadThrow (AsynclyT m)
|
||||
instance MonadTrans (AsynclyT) where
|
||||
lift mx = AsynclyT $ AsyncT $ \c _ k -> lift mx >>= (\a -> (k a c Nothing))
|
||||
|
||||
-- XXX orphan instance, use a newtype instead?
|
||||
instance (Monad m, MonadRecorder m)
|
||||
=> MonadRecorder (StateT Context m) where
|
||||
getJournal = lift getJournal
|
||||
putJournal = lift . putJournal
|
||||
play = lift . play
|
||||
|
||||
deriving instance (Monad m, MonadRecorder m)
|
||||
=> MonadRecorder (AsynclyT m)
|
||||
|
||||
-}
|
||||
|
||||
type AsynclyT = AsyncT
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
-- Running the monad
|
||||
------------------------------------------------------------------------------
|
||||
|
||||
-- | Run an 'AsynclyT m' computation, wait for it to finish and discard the
|
||||
-- | Run an 'AsyncT m' computation, wait for it to finish and discard the
|
||||
-- results.
|
||||
{-# INLINABLE runAsynclyLogged #-}
|
||||
runAsynclyLogged :: MonadAsync m
|
||||
=> Maybe (IORef [Recording]) -> AsynclyT m a -> m ()
|
||||
=> Maybe (IORef [Recording]) -> AsyncT m a -> m ()
|
||||
runAsynclyLogged lref m = run Nothing m
|
||||
|
||||
where
|
||||
@ -89,13 +57,13 @@ runAsynclyLogged lref m = run Nothing m
|
||||
|
||||
run ct mx = (runAsyncT mx) ct stop yield
|
||||
|
||||
runAsyncly :: MonadAsync m => AsynclyT m a -> m ()
|
||||
runAsyncly :: MonadAsync m => AsyncT m a -> m ()
|
||||
runAsyncly m = runAsynclyLogged Nothing m
|
||||
|
||||
-- | Run an 'AsynclyT m' computation and collect the results generated by each
|
||||
-- | Run an 'AsyncT m' computation and collect the results generated by each
|
||||
-- thread of the computation in a list.
|
||||
{-# INLINABLE toList #-}
|
||||
toList :: MonadAsync m => AsynclyT m a -> m [a]
|
||||
toList :: MonadAsync m => AsyncT m a -> m [a]
|
||||
toList m = run Nothing m
|
||||
|
||||
where
|
||||
@ -111,12 +79,12 @@ toList m = run Nothing m
|
||||
|
||||
-- | Run a given function concurrently on the list and collect the results.
|
||||
{-# INLINABLE for #-}
|
||||
for :: MonadAsync m => [a] -> (a -> AsyncT m b) -> AsynclyT m b
|
||||
for :: MonadAsync m => [a] -> (a -> AsyncT m b) -> AsyncT m b
|
||||
for xs f = foldr (<|>) empty $ map f xs
|
||||
|
||||
-- XXX rename to fromList?
|
||||
{-# INLINABLE each #-}
|
||||
each :: MonadAsync m => [a] -> AsynclyT m a
|
||||
each :: MonadAsync m => [a] -> AsyncT m a
|
||||
each xs = foldr (<>) empty $ map return xs
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
@ -125,21 +93,21 @@ each xs = foldr (<>) empty $ map return xs
|
||||
|
||||
-- | Compose a computation using previously captured logs
|
||||
playRecording :: (MonadAsync m, MonadRecorder m)
|
||||
=> AsynclyT m a -> Recording -> AsynclyT m a
|
||||
=> AsyncT m a -> Recording -> AsyncT m a
|
||||
playRecording m recording = play recording >> m
|
||||
|
||||
-- | Resume an 'AsyncT' computation using previously recorded logs. The
|
||||
-- recording consists of a list of journals one for each thread in the
|
||||
-- computation.
|
||||
playRecordings :: (MonadAsync m, MonadRecorder m)
|
||||
=> AsynclyT m a -> [Recording] -> AsynclyT m a
|
||||
=> AsyncT m a -> [Recording] -> AsyncT m a
|
||||
playRecordings m logs = each logs >>= playRecording m
|
||||
|
||||
{-
|
||||
-- | Run an 'AsyncT' computation with recording enabled, wait for it to finish
|
||||
-- returning results for completed threads and recordings for paused threads.
|
||||
toListRecorded :: (MonadAsync m, MonadCatch m)
|
||||
=> AsynclyT m a -> m ([a], [Recording])
|
||||
=> AsyncT m a -> m ([a], [Recording])
|
||||
toListRecorded m = do
|
||||
resultsRef <- liftIO $ newIORef []
|
||||
lref <- liftIO $ newIORef []
|
||||
@ -152,7 +120,7 @@ toListRecorded m = do
|
||||
-- | Run an 'AsyncT' computation with recording enabled, wait for it to finish
|
||||
-- and discard the results and return the recordings for paused threads, if
|
||||
-- any.
|
||||
runAsynclyRecorded :: MonadAsync m => AsynclyT (RecorderT m) a -> m [Recording]
|
||||
runAsynclyRecorded :: MonadAsync m => AsyncT (RecorderT m) a -> m [Recording]
|
||||
runAsynclyRecorded m = do
|
||||
lref <- liftIO $ newIORef []
|
||||
runRecorderT blank (runAsynclyLogged (Just lref) m)
|
||||
|
Loading…
Reference in New Issue
Block a user