Use direct style fromSVar for zip and function apply

Improves concurrent zip perf by 2x. There is modest improvement in zip and
apply as well.
This commit is contained in:
Harendra Kumar 2019-11-22 04:32:44 +05:30
parent e16a980c5f
commit bdce4a09aa
6 changed files with 401 additions and 237 deletions

View File

@ -128,10 +128,10 @@ module Streamly
-- * Parallel Function Application
-- $application
, (|$)
, (|&)
, (|$.)
, (|&.)
, (IP.|$)
, (IP.|&)
, (IP.|$.)
, (IP.|&.)
, mkAsync
-- * Merging Streams

View File

@ -107,6 +107,7 @@ module Streamly.Internal.Data.SVar
, toStreamVar
, SVarStats (..)
, dumpSVar
, printSVar
)
where
@ -859,6 +860,11 @@ dumpSVar sv = do
<> "---------STATS-----------\n"
<> stats
printSVar :: SVar t m a -> String -> IO ()
printSVar sv how = do
svInfo <- dumpSVar sv
hPutStrLn stderr $ "\n" <> how <> "\n" <> svInfo
-- MVar diagnostics has some overhead - around 5% on asyncly null benchmark, we
-- can keep it on in production to debug problems quickly if and when they
-- happen, but it may result in unexpected output when threads are left hanging

View File

@ -131,6 +131,12 @@ module Streamly.Internal.Prelude
, toStream -- XXX rename to write?
, toStreamRev -- XXX rename to writeRev?
-- * Function application
, (|$)
, (|&)
, (|$.)
, (|&.)
-- * Transformation
, transform
@ -387,6 +393,9 @@ module Streamly.Internal.Prelude
, usingStateT
, runStateT
-- * Concurrency
, mkParallel
-- * Diagnostics
, inspectMode
@ -435,7 +444,7 @@ import Streamly.Internal.Data.Fold.Types (Fold (..), Fold2 (..))
import Streamly.Internal.Data.Unfold.Types (Unfold)
import Streamly.Internal.Memory.Array.Types (Array, writeNUnsafe)
-- import Streamly.Memory.Ring (Ring)
import Streamly.Internal.Data.SVar (MonadAsync, defState, adaptState)
import Streamly.Internal.Data.SVar (MonadAsync, State, defState, adaptState)
import Streamly.Streams.Async (mkAsync')
import Streamly.Streams.Combinators (inspectMode, maxYields)
import Streamly.Streams.Prelude
@ -1473,6 +1482,101 @@ toPure = foldr K.cons K.nil
toPureRev :: Monad m => SerialT m a -> m (SerialT Identity a)
toPureRev = foldl' (flip K.cons) K.nil
------------------------------------------------------------------------------
-- Concurrent Application
------------------------------------------------------------------------------
-- XXX mkParallel is usually used to make serial streams concurrent by
-- evaluating the stream in another thread. Ideally, we should not be creating
-- an SVar if the stream itself is concurrent. For example, if we are zipping
-- two streams that are concurrently composed, we should be able to pull from
-- the SVar of that concurrent stream instead of creating another redundant
-- SVar on top of it.
--
{-# INLINE_NORMAL mkParallel #-}
mkParallel :: (IsStream t, MonadAsync m)
=> State K.Stream m a -> t m a -> m (t m a)
mkParallel st m = fmap fromStreamD $ D.mkParallel st (toStreamD m)
-- mkParallel = Par.mkParallel
infixr 0 |$
infixr 0 |$.
infixl 1 |&
infixl 1 |&.
-- | Parallel function application operator for streams; just like the regular
-- function application operator '$' except that it is concurrent. The
-- following code prints a value every second even though each stage adds a 1
-- second delay.
--
--
-- @
-- drain $
-- S.mapM (\\x -> threadDelay 1000000 >> print x)
-- |$ S.repeatM (threadDelay 1000000 >> return 1)
-- @
--
-- /Concurrent/
--
-- @since 0.3.0
{-# INLINE (|$) #-}
(|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b
f |$ x = D.fromStreamD $
D.applyParallel (D.toStreamD . f . D.fromStreamD) (D.toStreamD x)
-- (|$) = Par.applyParallel
-- | Parallel reverse function application operator for streams; just like the
-- regular reverse function application operator '&' except that it is
-- concurrent.
--
-- @
-- drain $
-- S.repeatM (threadDelay 1000000 >> return 1)
-- |& S.mapM (\\x -> threadDelay 1000000 >> print x)
-- @
--
-- /Concurrent/
--
-- @since 0.3.0
{-# INLINE (|&) #-}
(|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b
x |& f = f |$ x
-- | Parallel function application operator; applies a @run@ or @fold@ function
-- to a stream such that the fold consumer and the stream producer run in
-- parallel. A @run@ or @fold@ function reduces the stream to a value in the
-- underlying monad. The @.@ at the end of the operator is a mnemonic for
-- termination of the stream.
--
-- @
-- S.foldlM' (\\_ a -> threadDelay 1000000 >> print a) ()
-- |$. S.repeatM (threadDelay 1000000 >> return 1)
-- @
--
-- /Concurrent/
--
-- @since 0.3.0
{-# INLINE (|$.) #-}
(|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b
(|$.) = D.foldParallel
-- (|$.) = Par.foldParallel
-- | Parallel reverse function application operator for applying a run or fold
-- functions to a stream. Just like '|$.' except that the operands are reversed.
--
-- @
-- S.repeatM (threadDelay 1000000 >> return 1)
-- |&. S.foldlM' (\\_ a -> threadDelay 1000000 >> print a) ()
-- @
--
-- /Concurrent/
--
-- @since 0.3.0
{-# INLINE (|&.) #-}
(|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b
x |&. f = f |$. x
------------------------------------------------------------------------------
-- General Transformation
------------------------------------------------------------------------------
@ -2036,6 +2140,27 @@ zipWith f m1 m2 = fromStreamS $ S.zipWith f (toStreamS m1) (toStreamS m2)
-- Parallel Zipping
------------------------------------------------------------------------------
-- The CPS version and the direct version of zipAsyncWithM below seem to have
-- identical performane. However, we need to use the StreamD version of
-- mkParallel which uses a direct implementation of fromSVar. In comparison to
-- CPS version of fromSVar the direct version gives a 2x improvement.
-- | Like 'zipWithM' but zips concurrently i.e. both the streams being zipped
-- are generated concurrently.
--
-- @since 0.4.0
{-# INLINE zipAsyncWithM #-}
zipAsyncWithM :: (IsStream t, MonadAsync m)
=> (a -> b -> m c) -> t m a -> t m b -> t m c
zipAsyncWithM f m1 m2 = K.mkStream $ \st stp sng yld -> do
ma <- mkParallel (adaptState st) m1
mb <- mkParallel (adaptState st) m2
K.foldStream st stp sng yld $ zipWithM f ma mb
{-
zipAsyncWithM f m1 m2 =
fromStreamD $ D.zipAsyncWithM f (toStreamD m1) (toStreamD m2)
-}
-- | Like 'zipWith' but zips concurrently i.e. both the streams being zipped
-- are generated concurrently.
--
@ -2043,22 +2168,7 @@ zipWith f m1 m2 = fromStreamS $ S.zipWith f (toStreamS m1) (toStreamS m2)
{-# INLINE zipAsyncWith #-}
zipAsyncWith :: (IsStream t, MonadAsync m)
=> (a -> b -> c) -> t m a -> t m b -> t m c
zipAsyncWith f m1 m2 = K.mkStream $ \st stp sng yld -> do
ma <- Par.mkParallel (adaptState st) m1
mb <- Par.mkParallel (adaptState st) m2
K.foldStream st stp sng yld $ zipWith f ma mb
-- | Like 'zipWithM' but zips concurrently i.e. both the streams being zipped
-- are generated concurrently.
--
-- @since 0.4.0
{-# INLINABLE zipAsyncWithM #-}
zipAsyncWithM :: (IsStream t, MonadAsync m)
=> (a -> b -> m c) -> t m a -> t m b -> t m c
zipAsyncWithM f m1 m2 = K.mkStream $ \st stp sng yld -> do
ma <- Par.mkParallel (adaptState st) m1
mb <- Par.mkParallel (adaptState st) m2
K.foldStream st stp sng yld $ zipWithM f ma mb
zipAsyncWith f = zipAsyncWithM (\a b -> return (f a b))
------------------------------------------------------------------------------
-- Comparison

View File

@ -28,13 +28,9 @@ module Streamly.Streams.Parallel
, parallelFst
, parallelMin
, tapAsync
-- * Function application
, mkParallel
, (|$)
, (|&)
, (|$.)
, (|&.)
, applyParallel
, foldParallel
)
where
@ -49,7 +45,7 @@ import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.Functor (void)
import Data.IORef (readIORef, writeIORef, newIORef)
import Data.IORef (readIORef, writeIORef)
import Data.Maybe (fromJust)
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
@ -63,14 +59,10 @@ import Streamly.Streams.Serial (map)
import Streamly.Streams.StreamK (IsStream(..), Stream, mkStream, foldStream,
foldStreamShared, adapt)
import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS_)
import Streamly.Internal.Data.Time.Clock (Clock(..), getTime)
import Streamly.Internal.Data.SVar hiding (handleChildException)
import qualified Streamly.Streams.StreamK as K
import qualified Streamly.Streams.StreamD as D
import qualified Streamly.Internal.Data.SVar as SVar
import qualified Streamly.Internal.Data.Fold as FL
#include "Instances.hs"
@ -133,71 +125,6 @@ runOneLimited st m0 winfo = go m0
single a = sendit a >> (liftIO $ sendStop sv winfo)
yieldk a r = sendit a >> go r
-------------------------------------------------------------------------------
-- StreamD based worker routines
-------------------------------------------------------------------------------
-- Using StreamD the worker stream producing code can fuse with the code to
-- queue output to the SVar giving some perf boost.
--
-- Note that StreamD can only be used in limited situations, specifically, we
-- cannot implement joinStreamVarPar using this.
{-# INLINE_NORMAL pushWorkerParD #-}
pushWorkerParD :: MonadAsync m
=> State t m a -> SVar t m a -> D.Stream m a -> m ()
pushWorkerParD st sv xs =
if svarInspectMode sv
then forkWithDiag
else do
tid <-
case getYieldLimit st of
Nothing -> doFork (work Nothing)
(svarMrun sv)
(SVar.handleChildException sv)
Just _ -> doFork (workLim Nothing)
(svarMrun sv)
(SVar.handleChildException sv)
modifyThread sv tid
where
{-# NOINLINE work #-}
work info = (D.runFold (FL.toParallelSVar sv info) xs)
{-# NOINLINE workLim #-}
workLim info = D.runFold (FL.toParallelSVarLimited sv info) xs
{-# NOINLINE forkWithDiag #-}
forkWithDiag = do
-- We do not use workerCount in case of ParallelVar but still there is
-- no harm in maintaining it correctly.
liftIO $ atomicModifyIORefCAS_ (workerCount sv) $ \n -> n + 1
recordMaxWorkers sv
-- This allocation matters when significant number of workers are being
-- sent. We allocate it only when needed. The overhead increases by 4x.
winfo <-
case yieldRateInfo sv of
Nothing -> return Nothing
Just _ -> liftIO $ do
cntRef <- newIORef 0
t <- getTime Monotonic
lat <- newIORef (0, t)
return $ Just WorkerInfo
{ workerYieldMax = 0
, workerYieldCount = cntRef
, workerLatencyStart = lat
}
tid <-
case getYieldLimit st of
Nothing -> doFork (work winfo)
(svarMrun sv)
(SVar.handleChildException sv)
Just _ -> doFork (workLim winfo)
(svarMrun sv)
(SVar.handleChildException sv)
modifyThread sv tid
-------------------------------------------------------------------------------
-- Consing and appending a stream in parallel style
-------------------------------------------------------------------------------
@ -276,72 +203,38 @@ parallelMin = joinStreamVarPar ParallelVar StopAny
-- Convert a stream to parallel
------------------------------------------------------------------------------
_mkParallelK :: (IsStream t, MonadAsync m) => t m a -> m (t m a)
_mkParallelK m = do
sv <- newParallelVar StopNone defState
pushWorkerPar sv (runOne defState{streamVar = Just sv} $ toStream m)
return $ fromSVar sv
-- We may have to use it in higher order functions like concatMap, so use the
-- Normal phase for inlining.
{-# INLINE_NORMAL mkParallel #-}
mkParallel :: (IsStream t, MonadAsync m)
=> State Stream m a -> t m a -> m (t m a)
mkParallel st m = do
sv <- newParallelVar StopNone defState
pushWorkerParD st sv (D.toStreamD m)
-- pushWorkerPar sv (runOne st{streamVar = Just sv} $ toStream m)
D.toSVarParallel st sv $ D.toStreamD m
return $ fromSVar sv
------------------------------------------------------------------------------
-- Stream to stream concurrent function application
------------------------------------------------------------------------------
{-# INLINE _applyParallelK #-}
_applyParallelK :: (IsStream t, MonadAsync m)
=> (t m a -> t m b) -> t m a -> t m b
_applyParallelK f m = mkStream $ \st yld sng stp -> do
sv <- newParallelVar StopNone (adaptState st)
pushWorkerPar sv (runOne st{streamVar = Just sv} (toStream m))
foldStream st yld sng stp $ f $ fromSVar sv
-- XXX need to implement fromSVar in direct style for fused generation from
-- SVar.
{-# INLINE applyParallelD #-}
applyParallelD :: (IsStream t, MonadAsync m)
=> (t m a -> t m b) -> t m a -> t m b
applyParallelD f m = mkStream $ \st yld sng stp -> do
sv <- newParallelVar StopNone (adaptState st)
pushWorkerParD (adaptState st) sv (D.toStreamD m)
foldStream st yld sng stp $ f $ fromSVar sv
{-# INLINE applyParallel #-}
applyParallel :: (IsStream t, MonadAsync m)
=> (t m a -> t m b) -> t m a -> t m b
applyParallel = applyParallelD
applyParallel f m = mkStream $ \st yld sng stp -> do
sv <- newParallelVar StopNone (adaptState st)
-- pushWorkerPar sv (runOne st{streamVar = Just sv} (toStream m))
D.toSVarParallel (adaptState st) sv $ D.toStreamD m
foldStream st yld sng stp $ f $ fromSVar sv
------------------------------------------------------------------------------
-- Stream runner concurrent function application
------------------------------------------------------------------------------
{-# INLINE _foldParallelK #-}
_foldParallelK :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b
_foldParallelK f m = do
sv <- newParallelVar StopNone defState
pushWorkerPar sv (runOne defState{streamVar = Just sv} $ toStream m)
f $ fromSVar sv
-- XXX need to implement fromSVar in direct style for fused generation from
-- SVar.
{-# INLINE foldParallelD #-}
foldParallelD :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b
foldParallelD f m = do
sv <- newParallelVar StopNone defState
pushWorkerParD defState sv (D.toStreamD m)
f $ fromSVar sv
{-# INLINE foldParallel #-}
foldParallel :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b
foldParallel = foldParallelD
foldParallel f m = do
sv <- newParallelVar StopNone defState
-- pushWorkerPar sv (runOne defState{streamVar = Just sv} $ toStream m)
D.toSVarParallel defState sv $ D.toStreamD m
f $ fromSVar sv
------------------------------------------------------------------------------
-- Parallel tee
@ -427,85 +320,6 @@ tapAsync f m = mkStream $ \st yld sng stp -> do
(svarMrun sv) (handleChildException sv)
foldStreamShared st yld sng stp (teeToSVar sv m)
------------------------------------------------------------------------------
-- Concurrent Application
------------------------------------------------------------------------------
infixr 0 |$
infixr 0 |$.
infixl 1 |&
infixl 1 |&.
-- | Parallel function application operator for streams; just like the regular
-- function application operator '$' except that it is concurrent. The
-- following code prints a value every second even though each stage adds a 1
-- second delay.
--
--
-- @
-- drain $
-- S.mapM (\\x -> threadDelay 1000000 >> print x)
-- |$ S.repeatM (threadDelay 1000000 >> return 1)
-- @
--
-- /Concurrent/
--
-- @since 0.3.0
{-# INLINE (|$) #-}
(|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b
f |$ x = applyParallel f x
-- | Parallel reverse function application operator for streams; just like the
-- regular reverse function application operator '&' except that it is
-- concurrent.
--
-- @
-- drain $
-- S.repeatM (threadDelay 1000000 >> return 1)
-- |& S.mapM (\\x -> threadDelay 1000000 >> print x)
-- @
--
-- /Concurrent/
--
-- @since 0.3.0
{-# INLINE (|&) #-}
(|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b
x |& f = f |$ x
-- | Parallel function application operator; applies a @run@ or @fold@ function
-- to a stream such that the fold consumer and the stream producer run in
-- parallel. A @run@ or @fold@ function reduces the stream to a value in the
-- underlying monad. The @.@ at the end of the operator is a mnemonic for
-- termination of the stream.
--
-- @
-- S.foldlM' (\\_ a -> threadDelay 1000000 >> print a) ()
-- |$. S.repeatM (threadDelay 1000000 >> return 1)
-- @
--
-- /Concurrent/
--
-- @since 0.3.0
{-# INLINE (|$.) #-}
(|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b
f |$. x = foldParallel f x
-- | Parallel reverse function application operator for applying a run or fold
-- functions to a stream. Just like '|$.' except that the operands are reversed.
--
-- @
-- S.repeatM (threadDelay 1000000 >> return 1)
-- |&. S.foldlM' (\\_ a -> threadDelay 1000000 >> print a) ()
-- @
--
-- /Concurrent/
--
-- @since 0.3.0
{-# INLINE (|&.) #-}
(|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b
x |&. f = f |$. x
------------------------------------------------------------------------------
-- ParallelT
------------------------------------------------------------------------------

View File

@ -24,21 +24,12 @@ import Control.Monad.Catch (throwM)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.IORef (newIORef, readIORef, mkWeakIORef, writeIORef)
import Data.Maybe (isNothing)
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup ((<>))
#endif
import System.IO (hPutStrLn, stderr)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import System.Mem (performMajorGC)
import Streamly.Internal.Data.SVar
import Streamly.Streams.StreamK hiding (reverse)
printSVar :: SVar t m a -> String -> IO ()
printSVar sv how = do
svInfo <- dumpSVar sv
hPutStrLn stderr $ "\n" <> how <> "\n" <> svInfo
-- | Pull a stream from an SVar.
{-# NOINLINE fromStreamVar #-}
fromStreamVar :: MonadAsync m => SVar Stream m a -> Stream m a

View File

@ -95,6 +95,7 @@ module Streamly.Streams.StreamD
, fromListM
, fromStreamK
, fromStreamD
, fromSVar
-- * Elimination
-- ** General Folds
@ -136,6 +137,7 @@ module Streamly.Streams.StreamD
, findM
, find
, (!!)
, toSVarParallel
-- ** Flattening nested streams
, concatMapM
@ -260,6 +262,8 @@ module Streamly.Streams.StreamD
, indexedR
, zipWith
, zipWithM
, zipAsyncWith
, zipAsyncWithM
-- * Comparisons
, eqBy
@ -292,19 +296,25 @@ module Streamly.Streams.StreamD
, resumeDecodeUtf8Either
, decodeUtf8Arrays
, decodeUtf8ArraysLenient
-- * Concurrent Application
, mkParallel
, applyParallel
, foldParallel
)
where
import Control.Exception (Exception, SomeException)
import Control.Monad (void)
import Control.Monad.Catch (MonadCatch)
import Control.Exception (Exception, SomeException, fromException)
import Control.Monad (void, when)
import Control.Monad.Catch (MonadCatch, throwM)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader (ReaderT)
import Control.Monad.State.Strict (StateT)
import Control.Monad.Trans (MonadTrans(lift))
import Data.Bits (shiftR, shiftL, (.|.), (.&.))
import Data.Functor.Identity (Identity(..))
import Data.Maybe (fromJust, isJust)
import Data.IORef (newIORef, readIORef, mkWeakIORef, writeIORef)
import Data.Maybe (fromJust, isJust, isNothing)
import Data.Word (Word32)
import Foreign.Ptr (Ptr)
import Foreign.Storable (Storable(..))
@ -315,6 +325,7 @@ import GHC.Ptr (Ptr (..))
import GHC.Types (SPEC(..))
import GHC.Word (Word8(..))
import System.IO.Unsafe (unsafePerformIO)
import System.Mem (performMajorGC)
import Prelude
hiding (map, mapM, mapM_, repeat, foldr, last, take, filter,
takeWhile, drop, dropWhile, all, any, maximum, minimum, elem,
@ -325,18 +336,22 @@ import Prelude
import qualified Control.Monad.Catch as MC
import qualified Control.Monad.Reader as Reader
import qualified Control.Monad.State.Strict as State
import qualified Prelude
import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS_)
import Streamly.Internal.Memory.Array.Types (Array(..))
import Streamly.Internal.Data.Fold.Types (Fold(..))
import Streamly.Internal.Data.Pipe.Types (Pipe(..), PipeState(..))
import Streamly.Internal.Data.SVar (MonadAsync, defState, adaptState)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Unfold.Types (Unfold(..))
import Streamly.Internal.Data.Strict (Tuple'(..))
import Streamly.Internal.Data.Stream.StreamD.Type
import Streamly.Internal.Data.SVar
import qualified Streamly.Internal.Data.Pipe.Types as Pipe
import qualified Streamly.Internal.Memory.Array.Types as A
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Memory.Ring as RB
import qualified Streamly.Streams.StreamK as K
@ -648,10 +663,100 @@ fromListM = Stream step
toStreamD :: (K.IsStream t, Monad m) => t m a -> Stream m a
toStreamD = fromStreamK . K.toStream
-------------------------------------------------------------------------------
-- Generation from SVar
-------------------------------------------------------------------------------
data FromSVarState t m a =
FromSVarInit
| FromSVarRead (SVar t m a)
| FromSVarLoop (SVar t m a) [ChildEvent a]
| FromSVarDone (SVar t m a)
{-# INLINE_NORMAL fromSVar #-}
fromSVar :: (MonadAsync m) => SVar t m a -> Stream m a
fromSVar svar = Stream step FromSVarInit
where
{-# INLINE_LATE step #-}
step _ FromSVarInit = do
ref <- liftIO $ newIORef ()
_ <- liftIO $ mkWeakIORef ref hook
-- when this copy of svar gets garbage collected "ref" will get
-- garbage collected and our GC hook will be called.
let sv = svar{svarRef = Just ref}
return $ Skip (FromSVarRead sv)
where
{-# NOINLINE hook #-}
hook = do
when (svarInspectMode svar) $ do
r <- liftIO $ readIORef (svarStopTime (svarStats svar))
when (isNothing r) $
printSVar svar "SVar Garbage Collected"
cleanupSVar svar
-- If there are any SVars referenced by this SVar a GC will prompt
-- them to be cleaned up quickly.
when (svarInspectMode svar) performMajorGC
step _ (FromSVarRead sv) = do
list <- readOutputQ sv
-- Reversing the output is important to guarantee that we process the
-- outputs in the same order as they were generated by the constituent
-- streams.
return $ Skip $ FromSVarLoop sv (Prelude.reverse list)
step _ (FromSVarLoop sv []) = do
done <- postProcess sv
return $ Skip $ if done
then (FromSVarDone sv)
else (FromSVarRead sv)
step _ (FromSVarLoop sv (ev : es)) = do
case ev of
ChildYield a -> return $ Yield a (FromSVarLoop sv es)
ChildStop tid e -> do
accountThread sv tid
case e of
Nothing -> do
stop <- shouldStop tid
if stop
then do
liftIO (cleanupSVar sv)
return $ Skip (FromSVarDone sv)
else return $ Skip (FromSVarLoop sv es)
Just ex ->
case fromException ex of
Just ThreadAbort ->
return $ Skip (FromSVarLoop sv es)
Nothing -> liftIO (cleanupSVar sv) >> throwM ex
where
shouldStop tid =
case svarStopStyle sv of
StopNone -> return False
StopAny -> return True
StopBy -> do
sid <- liftIO $ readIORef (svarStopBy sv)
return $ if tid == sid then True else False
step _ (FromSVarDone sv) = do
when (svarInspectMode sv) $ do
t <- liftIO $ getTime Monotonic
liftIO $ writeIORef (svarStopTime (svarStats sv)) (Just t)
liftIO $ printSVar sv "SVar Done"
return Stop
-------------------------------------------------------------------------------
-- Hoisting the inner monad
-------------------------------------------------------------------------------
{-# INLINE_NORMAL hoist #-}
hoist :: Monad n => (forall x. m x -> n x) -> Stream m a -> Stream n a
hoist f (Stream step state) = (Stream step' state)
where
{-# INLINE_LATE step' #-}
step' gst st = do
r <- f $ step (adaptState gst) st
return $ case r of
@ -659,7 +764,7 @@ hoist f (Stream step state) = (Stream step' state)
Skip s -> Skip s
Stop -> Stop
{-# INLINE_NORMAL generally #-}
{-# INLINE generally #-}
generally :: Monad m => Stream Identity a -> Stream m a
generally = hoist (return . runIdentity)
@ -668,6 +773,7 @@ liftInner :: (Monad m, MonadTrans t, Monad (t m))
=> Stream m a -> Stream (t m) a
liftInner (Stream step state) = Stream step' state
where
{-# INLINE_LATE step' #-}
step' gst st = do
r <- lift $ step (adaptState gst) st
return $ case r of
@ -679,6 +785,7 @@ liftInner (Stream step state) = Stream step' state
runReaderT :: Monad m => s -> Stream (ReaderT s m) a -> Stream m a
runReaderT sval (Stream step state) = Stream step' state
where
{-# INLINE_LATE step' #-}
step' gst st = do
r <- Reader.runReaderT (step (adaptState gst) st) sval
return $ case r of
@ -690,6 +797,7 @@ runReaderT sval (Stream step state) = Stream step' state
evalStateT :: Monad m => s -> Stream (StateT s m) a -> Stream m a
evalStateT sval (Stream step state) = Stream step' (state, sval)
where
{-# INLINE_LATE step' #-}
step' gst (st, sv) = do
(r, sv') <- State.runStateT (step (adaptState gst) st) sv
return $ case r of
@ -701,6 +809,7 @@ evalStateT sval (Stream step state) = Stream step' (state, sval)
runStateT :: Monad m => s -> Stream (StateT s m) a -> Stream m (s, a)
runStateT sval (Stream step state) = Stream step' (state, sval)
where
{-# INLINE_LATE step' #-}
step' gst (st, sv) = do
(r, sv') <- State.runStateT (step (adaptState gst) st) sv
return $ case r of
@ -3426,6 +3535,37 @@ zipWithM f (Stream stepa ta) (Stream stepb tb) = Stream step (ta, tb, Nothing)
zipWith :: Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipWith f = zipWithM (\a b -> return (f a b))
-- | Like 'zipWithM' but zips concurrently i.e. both the streams being zipped
-- are generated concurrently.
--
{-# INLINE_NORMAL zipAsyncWithM #-}
zipAsyncWithM :: MonadAsync m
=> (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWithM f m1 m2 = Stream step Nothing
where
{-# INLINE_LATE step #-}
step gst Nothing = do
ma <- mkParallel (adaptState gst) m1
mb <- mkParallel (adaptState gst) m2
return $ Skip (Just $ zipWithM f ma mb)
step gst (Just (UnStream step1 st)) = do
r <- step1 gst st
return $ case r of
Yield a s -> Yield a (Just $ Stream step1 s)
Skip s -> Skip (Just $ Stream step1 s)
Stop -> Stop
-- | Like 'zipWith' but zips concurrently i.e. both the streams being zipped
-- are generated concurrently.
--
{-# INLINE zipAsyncWith #-}
zipAsyncWith :: MonadAsync m
=> (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipAsyncWith f = zipAsyncWithM (\a b -> return (f a b))
------------------------------------------------------------------------------
-- Merging
------------------------------------------------------------------------------
@ -3983,3 +4123,106 @@ encodeUtf8 (Stream step state) = Stream step' (EncodeState state WNil)
Skip s -> Skip (EncodeState s WNil)
Stop -> Stop
step' _ (EncodeState s (WCons x xs)) = return $ Yield x (EncodeState s xs)
-------------------------------------------------------------------------------
-- Concurrent application and fold
-------------------------------------------------------------------------------
-- Using StreamD the worker stream producing code can fuse with the code to
-- queue output to the SVar giving some perf boost.
--
-- Note that StreamD can only be used in limited situations, specifically, we
-- cannot implement joinStreamVarPar using this.
--
-- XXX make sure that the SVar passed is a Parallel style SVar.
-- | Fold the supplied stream to the SVar asynchronously using Parallel
-- concurrency style.
-- {-# INLINE_NORMAL toSVarParallel #-}
{-# INLINE toSVarParallel #-}
toSVarParallel :: MonadAsync m
=> State t m a -> SVar t m a -> Stream m a -> m ()
toSVarParallel st sv xs =
if svarInspectMode sv
then forkWithDiag
else do
tid <-
case getYieldLimit st of
Nothing -> doFork (work Nothing)
(svarMrun sv)
(handleChildException sv)
Just _ -> doFork (workLim Nothing)
(svarMrun sv)
(handleChildException sv)
modifyThread sv tid
where
{-# NOINLINE work #-}
work info = (runFold (FL.toParallelSVar sv info) xs)
{-# NOINLINE workLim #-}
workLim info = runFold (FL.toParallelSVarLimited sv info) xs
{-# NOINLINE forkWithDiag #-}
forkWithDiag = do
-- We do not use workerCount in case of ParallelVar but still there is
-- no harm in maintaining it correctly.
liftIO $ atomicModifyIORefCAS_ (workerCount sv) $ \n -> n + 1
recordMaxWorkers sv
-- This allocation matters when significant number of workers are being
-- sent. We allocate it only when needed. The overhead increases by 4x.
winfo <-
case yieldRateInfo sv of
Nothing -> return Nothing
Just _ -> liftIO $ do
cntRef <- newIORef 0
t <- getTime Monotonic
lat <- newIORef (0, t)
return $ Just WorkerInfo
{ workerYieldMax = 0
, workerYieldCount = cntRef
, workerLatencyStart = lat
}
tid <-
case getYieldLimit st of
Nothing -> doFork (work winfo)
(svarMrun sv)
(handleChildException sv)
Just _ -> doFork (workLim winfo)
(svarMrun sv)
(handleChildException sv)
modifyThread sv tid
{-# INLINE_NORMAL mkParallel #-}
mkParallel :: MonadAsync m => State t m a -> Stream m a -> m (Stream m a)
mkParallel st m = do
sv <- newParallelVar StopNone defState
toSVarParallel st sv m
return $ fromSVar sv
{-# INLINE_NORMAL applyParallel #-}
applyParallel :: MonadAsync m
=> (Stream m a -> Stream m b) -> Stream m a -> Stream m b
applyParallel f m = Stream step Nothing
where
{-# INLINE_LATE step #-}
step gst Nothing = do
sv <- newParallelVar StopNone (adaptState gst)
toSVarParallel (adaptState gst) sv m
return $ Skip $ Just $ f (fromSVar sv)
step gst (Just (UnStream step1 st)) = do
r <- step1 gst st
return $ case r of
Yield a s -> Yield a (Just $ Stream step1 s)
Skip s -> Skip (Just $ Stream step1 s)
Stop -> Stop
{-# INLINE foldParallel #-}
foldParallel :: (K.IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b
foldParallel f m = do
sv <- newParallelVar StopNone defState
toSVarParallel defState sv (toStreamD m)
f $ fromStreamD $ fromSVar sv