Keep just one basic async primitive, remove the rest

This commit is contained in:
Harendra Kumar 2017-07-01 01:35:49 +05:30
parent 120d0ed5f0
commit fe02431534
10 changed files with 110 additions and 218 deletions

View File

@ -9,24 +9,27 @@
--
module Strands
( wait
, waitLogged
( AsyncT
, MonadAsync
, wait
, wait_
, waitLogged_
, threads
, async
, waitEvents
, each
, gather
, sample
, threads
, Log
, Loggable
, waitLogged
, waitLogged_
, logged
, suspend
, withLog
, eachWithLog
, Log
, Loggable
)
where
import Strands.AsyncT (AsyncT)
import Strands.Context (Log, Loggable)
import Strands.Threads

View File

@ -49,24 +49,24 @@ dbg _ = return ()
------------------------------------------------------------------------------
instance Monad (AsyncT m) => Functor (AsyncT m) where
fmap f mx = do
x <- mx
return $ f x
fmap f mx = do
x <- mx
return $ f x
------------------------------------------------------------------------------
-- Applicative
------------------------------------------------------------------------------
instance Monad m => Applicative (AsyncT m) where
pure a = AsyncT . return $ Just a
m1 <*> m2 = do { x1 <- m1; x2 <- m2; return (x1 x2) }
pure a = AsyncT . return $ Just a
m1 <*> m2 = do { x1 <- m1; x2 <- m2; return (x1 x2) }
------------------------------------------------------------------------------
-- Alternative
------------------------------------------------------------------------------
instance Monad m => Alternative (AsyncT m) where
empty = AsyncT $ return Nothing
empty = AsyncT $ return Nothing
(<|>) x y = AsyncT $ do
mx <- runAsyncT x
loc <- getLocation
@ -80,31 +80,30 @@ instance Monad m => Alternative (AsyncT m) where
instance Monad m => Monad (AsyncT m) where
return = pure
m >>= f = AsyncT $ do
saveContext m f >> runAsyncT m >>= restoreContext >>= runAsyncT
instance Monad m => MonadPlus (AsyncT m) where
mzero = empty
mplus = (<|>)
mzero = empty
mplus = (<|>)
instance (Monoid a, Monad (AsyncT m)) => Monoid (AsyncT m a) where
mappend x y = mappend <$> x <*> y
mempty = return mempty
instance (Monoid a, Monad m) => Monoid (AsyncT m a) where
mappend x y = mappend <$> x <*> y
mempty = return mempty
------------------------------------------------------------------------------
-- MonadIO
------------------------------------------------------------------------------
instance MonadIO m => MonadIO (AsyncT m) where
liftIO mx = AsyncT $ liftIO mx >>= return . Just
liftIO mx = AsyncT $ liftIO mx >>= return . Just
-------------------------------------------------------------------------------
-- AsyncT transformer
-------------------------------------------------------------------------------
instance MonadTrans AsyncT where
lift mx = AsyncT $ lift mx >>= return . Just
lift mx = AsyncT $ lift mx >>= return . Just
instance MonadTransControl AsyncT where
type StT AsyncT a = (Maybe a, Context)
@ -126,7 +125,7 @@ instance (MonadBaseControl b m, MonadIO m) => MonadBaseControl b (AsyncT m) wher
{-# INLINABLE restoreM #-}
instance MonadThrow m => MonadThrow (AsyncT m) where
throwM e = lift $ throwM e
throwM e = lift $ throwM e
------------------------------------------------------------------------------
-- More operators, instances

View File

@ -25,7 +25,7 @@ where
import Control.Concurrent (ThreadId)
import Control.Concurrent.STM (TChan)
import Control.Exception (SomeException)
import Control.Monad.State (StateT, MonadPlus(..), MonadIO(..))
import Control.Monad.State (StateT, MonadPlus(..))
import qualified Control.Monad.Trans.State.Lazy as Lazy (get, gets, modify, put)
import Data.Dynamic (Typeable)
import Data.IORef (IORef)
@ -214,14 +214,13 @@ restoreContext x = do
Lazy.put ctx { currentm = unsafeCoerce mnext, fstack = fs }
return mnext
-- | We can retrieve a context at any point and resume that context at some
-- later point, upon resumption we start executing again from the same point
-- where the context was retrieved from.
--
-- This function composes the stored context to recover the computation.
--
composeContext :: (MonadIO m, MonadPlus m) => Context -> m a
composeContext :: Monad m => Context -> m a
composeContext Context { currentm = m
, fstack = fs
} =

View File

@ -1,24 +1,21 @@
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Strands.Threads
( parallel
, waitEvents
, async
, each
, sample
, sync
, gather
--, react
, threads
( MonadAsync
, wait
, wait_
, threads
, async
, each
, gather
, logged
, suspend
, waitLogged
, waitLogged_
, logged
, suspend
, withLog
, eachWithLog
)
@ -26,12 +23,11 @@ where
import Control.Applicative ((<|>), empty)
import Control.Concurrent (ThreadId, forkIO, killThread,
myThreadId, threadDelay)
myThreadId)
import Control.Concurrent.STM (TChan, atomically, newTChan,
readTChan, tryReadTChan,
writeTChan)
import Control.Exception (ErrorCall (..),
SomeException (..), catch)
import Control.Exception (SomeException (..))
import qualified Control.Exception.Lifted as EL
import Control.Monad.Catch (MonadCatch, MonadThrow, throwM,
try)
@ -49,6 +45,8 @@ import Data.Maybe (isJust)
import Strands.AsyncT
import Strands.Context
type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m)
------------------------------------------------------------------------------
-- Model of computation
------------------------------------------------------------------------------
@ -80,11 +78,11 @@ import Strands.Context
-- | Continue execution of the closure that we were executing when we migrated
-- to a new thread.
runContext :: MonadIO m => Context -> StateT Context m ()
runContext :: Monad m => Context -> StateT Context m ()
runContext ctx = do
let s = runAsyncT (composeContext ctx)
_ <- lift $ runStateT s ctx
return ()
let s = runAsyncT (composeContext ctx)
_ <- lift $ runStateT s ctx
return ()
------------------------------------------------------------------------------
-- Thread Management (creation, reaping and killing)
@ -180,7 +178,9 @@ signalQSemB sem = atomicModifyIORef sem $ \n -> (n + 1, ())
--
forkFinally1 :: (MonadIO m, MonadBaseControl IO m)
=> Context -> (Either SomeException () -> IO ()) -> StateT Context m ThreadId
=> Context
-> (Either SomeException () -> IO ())
-> StateT Context m ThreadId
forkFinally1 ctx preExit =
EL.mask $ \restore ->
liftBaseWith $ \runInIO -> forkIO $ do
@ -252,30 +252,18 @@ canFork context = do
-- in the same thread or in a new thread depending on the synch parameter and
-- the current thread quota.
--
resumeContextWith :: (MonadBaseControl IO m, MonadIO m, MonadThrow m)
resumeContextWith :: MonadAsync m
=> Context -- the context to resume
-> Bool -- force synchronous
-> (Context -> AsyncT m (StreamData a)) -- the action to execute in the resumed context
-> AsyncT m a -- the action to execute in the resumed context
-> StateT Context m ()
resumeContextWith context synch action = do
let ctx = setContextMailBox context (action context)
let ctx = setContextMailBox context action
can <- liftIO $ canFork context
case can && (not synch) of
False -> runContext ctx -- run synchronously
True -> forkContext ctx
instance Read SomeException where
readsPrec _n str = [(SomeException $ ErrorCall s, r)]
where [(s , r)] = read str
-- | 'StreamData' represents a task in a task stream being generated.
data StreamData a =
SMore a -- ^ More tasks to come
| SLast a -- ^ This is the last task
| SDone -- ^ No more tasks, we are done
| SError SomeException -- ^ An error occurred
deriving (Show, Read)
-- The current model is to start a new thread for every task. The input is
-- provided at the time of the creation and therefore no synchronization is
-- needed compared to a pool of threads contending to get the input from a
@ -296,173 +284,44 @@ data StreamData a =
-- parent task needs to terminate. Either the task is fully done or we handed
-- it over to another thread, in any case the current thread is done.
spawningParentDone :: MonadIO m => StateT Context m (Maybe (StreamData a))
spawningParentDone :: MonadIO m => StateT Context m (Maybe a)
spawningParentDone = do
loc <- getLocation
when (loc /= RemoteNode) $ setLocation WaitingParent
return Nothing
-- | Captures the state of the current computation at this point, starts a new
-- thread, passing the captured state, runs the argument computation in the new
-- thread, returns its value and continues the computation from the capture
-- point. The end result is as if this function just returned the value
-- generated by the argument computation albeit in a new thread.
-- thread, using the argument passed as the return value of the computation and
-- continues the computation from the capture point. The end result is as if
-- this function just returned the value passed as argument albeit in a new
-- thread.
--
-- If a new thread cannot be created then the computation is run in the same
-- thread, but the functional behavior remains the same.
spawnAsyncT :: (MonadIO m, MonadBaseControl IO m, MonadThrow m)
=> (Context -> AsyncT m (StreamData a))
-> AsyncT m (StreamData a)
spawnAsyncT func = AsyncT $ do
async :: MonadAsync m => AsyncT m a -> AsyncT m a
async action = AsyncT $ do
val <- takeContextMailBox
case val of
-- Child task
Right x -> runAsyncT x
Right m -> runAsyncT m
-- Spawning parent
Left ctx -> do
resumeContextWith ctx False func
resumeContextWith ctx False action
-- If we started the task asynchronously in a new thread then the
-- parent thread reaches here, immediately after spawning the task.
--
-- However, if the task was executed synchronously then we will
-- reach after it is completely done.
-- However, if the task was executed synchronously then we reach
-- here after it is completely done.
spawningParentDone
-- | Execute the specified IO action, resume the saved context returning the
-- output of the io action, continue this in a loop until the ioaction
-- indicates that its done.
loopContextWith :: (MonadIO m, MonadBaseControl IO m, MonadThrow m)
=> IO (StreamData a) -> Context -> AsyncT m (StreamData a)
loopContextWith ioaction context = AsyncT $ do
-- Note that the context that we are resuming may have been passed from
-- another thread, therefore we must inherit thread control related
-- parameters from the current context.
curCtx <- get
loop context
{ pendingThreads = pendingThreads curCtx
, childChannel = childChannel curCtx
}
where
loop ctx = do
streamData <- liftIO $ ioaction `catch`
\(e :: SomeException) -> return $ SError e
let resumeCtx synch = resumeContextWith ctx synch $ \_ ->
return streamData
case streamData of
SMore _ -> resumeCtx False >> loop ctx
_ -> resumeCtx True >> spawningParentDone
-- | Run an IO action one or more times to generate a stream of tasks. The IO
-- action returns a 'StreamData'. When it returns an 'SMore' or 'SLast' a new
-- task is triggered with the result value. If the return value is 'SMore', the
-- action is run again to generate the next task, otherwise task creation
-- stops.
--
-- Unless the maximum number of threads (set with 'threads') has been reached,
-- the task is generated in a new thread and the current thread returns a void
-- task.
parallel :: (MonadIO m, MonadBaseControl IO m, MonadThrow m)
=> IO (StreamData a) -> AsyncT m (StreamData a)
parallel ioaction = spawnAsyncT (loopContextWith ioaction)
-- | An task stream generator that produces an infinite stream of tasks by
-- running an IO computation in a loop. A task is triggered carrying the output
-- of the computation. See 'parallel' for notes on the return value.
waitEvents :: (MonadIO m, MonadBaseControl IO m, MonadThrow m)
=> IO a -> AsyncT m a
waitEvents io = do
mr <- parallel (SMore <$> io)
case mr of
SMore x -> return x
-- SError e -> back e
-- | Run an IO computation asynchronously and generate a single task carrying
-- the result of the computation when it completes. See 'parallel' for notes on
-- the return value.
async :: (MonadIO m, MonadBaseControl IO m, MonadThrow m)
=> IO a -> AsyncT m a
async io = do
mr <- parallel (SLast <$> io)
case mr of
SLast x -> return x
-- SError e -> back e
-- scatter
each :: (MonadIO m, MonadBaseControl IO m, MonadThrow m)
=> [a] -> AsyncT m a
each xs = foldl (<|>) empty $ map (async . return) xs
-- | Force an async computation to run synchronously. It can be useful in an
-- 'Alternative' composition to run the alternative only after finishing a
-- computation. Note that in Applicatives it might result in an undesired
-- serialization.
sync :: MonadIO m => AsyncT m a -> AsyncT m a
sync x = AsyncT $ do
setLocation RemoteNode
r <- runAsyncT x
setLocation Worker
return r
-- | An task stream generator that produces an infinite stream of tasks by
-- running an IO computation periodically at the specified time interval. The
-- task carries the result of the computation. A new task is generated only if
-- the output of the computation is different from the previous one. See
-- 'parallel' for notes on the return value.
sample :: (Eq a, MonadIO m, MonadBaseControl IO m, MonadThrow m)
=> IO a -> Int -> AsyncT m a
sample action interval = do
v <- liftIO action
prev <- liftIO $ newIORef v
waitEvents (loop action prev) <|> async (return v)
where loop act prev = loop'
where loop' = do
threadDelay interval
v <- act
v' <- readIORef prev
if v /= v' then writeIORef prev v >> return v else loop'
-- | Make a transient task generator from an asynchronous callback handler.
--
-- The first parameter is a callback. The second parameter is a value to be
-- returned to the callback; if the callback expects no return value it
-- can just be a @return ()@. The callback expects a setter function taking the
-- @eventdata@ as an argument and returning a value to the callback; this
-- function is supplied by 'react'.
--
-- Callbacks from foreign code can be wrapped into such a handler and hooked
-- into the transient monad using 'react'. Every time the callback is called it
-- generates a new task for the transient monad.
--
{-
react
:: (Monad m, MonadIO m)
=> ((eventdata -> m response) -> m ())
-> IO response
-> AsyncT m eventdata
react setHandler iob = AsyncT $ do
context <- get
case event context of
Nothing -> do
lift $ setHandler $ \dat ->do
resume (updateContextEvent context dat)
liftIO iob
loc <- getLocation
when (loc /= RemoteNode) $ setLocation WaitingParent
return Nothing
j@(Just _) -> do
put context{event=Nothing}
return $ unsafeCoerce j
-}
------------------------------------------------------------------------------
-- Controlling thread quota
------------------------------------------------------------------------------
@ -484,12 +343,6 @@ threads n process = AsyncT $ do
return r
{-
-- | Run a "non transient" computation within the underlying state monad, so it
-- is guaranteed that the computation neither can stop nor can trigger
-- additional events/threads.
noTrans :: Monad m => StateM m x -> AsyncT m x
noTrans x = AsyncT $ x >>= return . Just
-- This can be used to set, increase or decrease the existing limit. The limit
-- is shared by multiple threads and therefore needs to modified atomically.
-- Note that when there is no limit the limit is set to maxBound it can

View File

@ -42,6 +42,18 @@ library
, stm
, transformers-base
test-suite test
type: exitcode-stdio-1.0
main-is: Main.hs
hs-source-dirs: test
ghc-options: -O0 -Wall -fwarn-identities -fwarn-incomplete-record-updates -fwarn-incomplete-uni-patterns -fwarn-tabs
build-depends:
base >= 4.7 && < 5
, hspec >= 2.0 && < 3
, containers
, strands
default-language: Haskell2010
source-repository head
type: git
location: https://github.com/harendra-kumar/strands

View File

@ -11,10 +11,10 @@ main = do
liftIO $ hSetBuffering stdout LineBuffering
mainThread <- liftIO myThreadId
liftIO $ putStrLn $ "Main thread: " ++ show mainThread
x <- async (randomIO :: IO Int)
x <- async $ liftIO (randomIO :: IO Int)
liftIO $ putStrLn $ show x
y <- async (randomIO :: IO Int)
y <- async $ liftIO (randomIO :: IO Int)
liftIO $ threadDelay 1000000
evThread <- liftIO myThreadId

View File

@ -1,7 +1,13 @@
{-# LANGUAGE FlexibleContexts #-}
import Control.Applicative
import Control.Concurrent (threadDelay, myThreadId)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.IO.Class (liftIO, MonadIO)
import System.Random (randomIO)
import System.IO
import Control.Monad (forever, mzero)
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.IORef
import Control.Monad.Catch (MonadThrow)
import Strands
@ -9,14 +15,34 @@ main = wait_ $ threads 3 $ do
liftIO $ hSetBuffering stdout LineBuffering
mainThread <- liftIO myThreadId
liftIO $ putStrLn $ "Main thread: " ++ show mainThread
x <- sample (randomIO :: IO Int) 1000000
{-
evThread <- liftIO myThreadId
liftIO $ putStrLn $ "X Event thread: " ++ show evThread
liftIO $ putStrLn $ "\nX Event thread: " ++ show evThread
liftIO $ putStrLn $ "x = " ++ (show x)
-}
y <- sample (randomIO :: IO Int) 1000000
-- liftIO $ threadDelay 10000000
evThread <- liftIO myThreadId
liftIO $ putStrLn $ "Y Event thread: " ++ show evThread
liftIO $ putStrLn $ "y = " ++ (show y)
--liftIO $ putStrLn $ "\nY Event thread: " ++ show evThread
liftIO $ putStrLn $ "(x,y) = " ++ (show (x,y))
sample :: (Eq a, MonadIO m, MonadBaseControl IO m, MonadThrow m)
=> IO a -> Int -> AsyncT m a
sample action interval = do
v <- async $ liftIO action
prev <- liftIO $ newIORef v
async (return v) <|> loop action prev
where loop act prev = loop'
where loop' = do
liftIO $ threadDelay interval
v' <- liftIO $ readIORef prev
v <- liftIO act
if v /= v' then do
liftIO (writeIORef prev v)
async (return v) <|> loop'
else do
loop'

View File

@ -14,14 +14,14 @@ main = wait_ $ threads 3 $ do
eventA <|> eventB
eventA = do
x <- sample (randomIO :: IO Int) 1000000
x <- async $ liftIO (randomIO :: IO Int)
evThread <- liftIO myThreadId
liftIO $ putStrLn $ "X Event thread: " ++ show evThread
liftIO $ putStrLn $ "x = " ++ (show x)
return x
eventB = do
y <- sample (randomIO :: IO Int) 1000000
y <- async $ liftIO (randomIO :: IO Int)
-- liftIO $ threadDelay 10000000
evThread <- liftIO myThreadId

View File

@ -14,4 +14,4 @@ main = do
return x
putStrLn $ show xs
where
event n = async (do putStrLn ("event" ++ show n); return n :: IO Int)
event n = async $ liftIO $ (do putStrLn ("event" ++ show n); return n :: IO Int)

View File

@ -32,4 +32,4 @@ main = do
suspend
liftIO $ print ("C", r, x)
event n = async (do putStrLn ("event" ++ show n); return n :: IO Int)
event n = async $ liftIO $ (do putStrLn ("event" ++ show n); return n :: IO Int)