diff --git a/src/Streamly.hs b/src/Streamly.hs index 549918138..de96ea9b5 100644 --- a/src/Streamly.hs +++ b/src/Streamly.hs @@ -128,10 +128,10 @@ module Streamly -- * Parallel Function Application -- $application - , (|$) - , (|&) - , (|$.) - , (|&.) + , (IP.|$) + , (IP.|&) + , (IP.|$.) + , (IP.|&.) , mkAsync -- * Merging Streams diff --git a/src/Streamly/Internal/Data/SVar.hs b/src/Streamly/Internal/Data/SVar.hs index f0752083c..1df0bdb3d 100644 --- a/src/Streamly/Internal/Data/SVar.hs +++ b/src/Streamly/Internal/Data/SVar.hs @@ -107,6 +107,7 @@ module Streamly.Internal.Data.SVar , toStreamVar , SVarStats (..) , dumpSVar + , printSVar ) where @@ -859,6 +860,11 @@ dumpSVar sv = do <> "---------STATS-----------\n" <> stats +printSVar :: SVar t m a -> String -> IO () +printSVar sv how = do + svInfo <- dumpSVar sv + hPutStrLn stderr $ "\n" <> how <> "\n" <> svInfo + -- MVar diagnostics has some overhead - around 5% on asyncly null benchmark, we -- can keep it on in production to debug problems quickly if and when they -- happen, but it may result in unexpected output when threads are left hanging diff --git a/src/Streamly/Internal/Prelude.hs b/src/Streamly/Internal/Prelude.hs index 1f911b65c..e0e8338b4 100644 --- a/src/Streamly/Internal/Prelude.hs +++ b/src/Streamly/Internal/Prelude.hs @@ -131,6 +131,12 @@ module Streamly.Internal.Prelude , toStream -- XXX rename to write? , toStreamRev -- XXX rename to writeRev? + -- * Function application + , (|$) + , (|&) + , (|$.) + , (|&.) + -- * Transformation , transform @@ -387,6 +393,9 @@ module Streamly.Internal.Prelude , usingStateT , runStateT + -- * Concurrency + , mkParallel + -- * Diagnostics , inspectMode @@ -435,7 +444,7 @@ import Streamly.Internal.Data.Fold.Types (Fold (..), Fold2 (..)) import Streamly.Internal.Data.Unfold.Types (Unfold) import Streamly.Internal.Memory.Array.Types (Array, writeNUnsafe) -- import Streamly.Memory.Ring (Ring) -import Streamly.Internal.Data.SVar (MonadAsync, defState, adaptState) +import Streamly.Internal.Data.SVar (MonadAsync, State, defState, adaptState) import Streamly.Streams.Async (mkAsync') import Streamly.Streams.Combinators (inspectMode, maxYields) import Streamly.Streams.Prelude @@ -1473,6 +1482,101 @@ toPure = foldr K.cons K.nil toPureRev :: Monad m => SerialT m a -> m (SerialT Identity a) toPureRev = foldl' (flip K.cons) K.nil +------------------------------------------------------------------------------ +-- Concurrent Application +------------------------------------------------------------------------------ + +-- XXX mkParallel is usually used to make serial streams concurrent by +-- evaluating the stream in another thread. Ideally, we should not be creating +-- an SVar if the stream itself is concurrent. For example, if we are zipping +-- two streams that are concurrently composed, we should be able to pull from +-- the SVar of that concurrent stream instead of creating another redundant +-- SVar on top of it. +-- +{-# INLINE_NORMAL mkParallel #-} +mkParallel :: (IsStream t, MonadAsync m) + => State K.Stream m a -> t m a -> m (t m a) +mkParallel st m = fmap fromStreamD $ D.mkParallel st (toStreamD m) +-- mkParallel = Par.mkParallel + +infixr 0 |$ +infixr 0 |$. + +infixl 1 |& +infixl 1 |&. + +-- | Parallel function application operator for streams; just like the regular +-- function application operator '$' except that it is concurrent. The +-- following code prints a value every second even though each stage adds a 1 +-- second delay. +-- +-- +-- @ +-- drain $ +-- S.mapM (\\x -> threadDelay 1000000 >> print x) +-- |$ S.repeatM (threadDelay 1000000 >> return 1) +-- @ +-- +-- /Concurrent/ +-- +-- @since 0.3.0 +{-# INLINE (|$) #-} +(|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b +f |$ x = D.fromStreamD $ + D.applyParallel (D.toStreamD . f . D.fromStreamD) (D.toStreamD x) +-- (|$) = Par.applyParallel + +-- | Parallel reverse function application operator for streams; just like the +-- regular reverse function application operator '&' except that it is +-- concurrent. +-- +-- @ +-- drain $ +-- S.repeatM (threadDelay 1000000 >> return 1) +-- |& S.mapM (\\x -> threadDelay 1000000 >> print x) +-- @ +-- +-- /Concurrent/ +-- +-- @since 0.3.0 +{-# INLINE (|&) #-} +(|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b +x |& f = f |$ x + +-- | Parallel function application operator; applies a @run@ or @fold@ function +-- to a stream such that the fold consumer and the stream producer run in +-- parallel. A @run@ or @fold@ function reduces the stream to a value in the +-- underlying monad. The @.@ at the end of the operator is a mnemonic for +-- termination of the stream. +-- +-- @ +-- S.foldlM' (\\_ a -> threadDelay 1000000 >> print a) () +-- |$. S.repeatM (threadDelay 1000000 >> return 1) +-- @ +-- +-- /Concurrent/ +-- +-- @since 0.3.0 +{-# INLINE (|$.) #-} +(|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b +(|$.) = D.foldParallel +-- (|$.) = Par.foldParallel + +-- | Parallel reverse function application operator for applying a run or fold +-- functions to a stream. Just like '|$.' except that the operands are reversed. +-- +-- @ +-- S.repeatM (threadDelay 1000000 >> return 1) +-- |&. S.foldlM' (\\_ a -> threadDelay 1000000 >> print a) () +-- @ +-- +-- /Concurrent/ +-- +-- @since 0.3.0 +{-# INLINE (|&.) #-} +(|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b +x |&. f = f |$. x + ------------------------------------------------------------------------------ -- General Transformation ------------------------------------------------------------------------------ @@ -2036,6 +2140,27 @@ zipWith f m1 m2 = fromStreamS $ S.zipWith f (toStreamS m1) (toStreamS m2) -- Parallel Zipping ------------------------------------------------------------------------------ +-- The CPS version and the direct version of zipAsyncWithM below seem to have +-- identical performane. However, we need to use the StreamD version of +-- mkParallel which uses a direct implementation of fromSVar. In comparison to +-- CPS version of fromSVar the direct version gives a 2x improvement. + +-- | Like 'zipWithM' but zips concurrently i.e. both the streams being zipped +-- are generated concurrently. +-- +-- @since 0.4.0 +{-# INLINE zipAsyncWithM #-} +zipAsyncWithM :: (IsStream t, MonadAsync m) + => (a -> b -> m c) -> t m a -> t m b -> t m c +zipAsyncWithM f m1 m2 = K.mkStream $ \st stp sng yld -> do + ma <- mkParallel (adaptState st) m1 + mb <- mkParallel (adaptState st) m2 + K.foldStream st stp sng yld $ zipWithM f ma mb +{- +zipAsyncWithM f m1 m2 = + fromStreamD $ D.zipAsyncWithM f (toStreamD m1) (toStreamD m2) +-} + -- | Like 'zipWith' but zips concurrently i.e. both the streams being zipped -- are generated concurrently. -- @@ -2043,22 +2168,7 @@ zipWith f m1 m2 = fromStreamS $ S.zipWith f (toStreamS m1) (toStreamS m2) {-# INLINE zipAsyncWith #-} zipAsyncWith :: (IsStream t, MonadAsync m) => (a -> b -> c) -> t m a -> t m b -> t m c -zipAsyncWith f m1 m2 = K.mkStream $ \st stp sng yld -> do - ma <- Par.mkParallel (adaptState st) m1 - mb <- Par.mkParallel (adaptState st) m2 - K.foldStream st stp sng yld $ zipWith f ma mb - --- | Like 'zipWithM' but zips concurrently i.e. both the streams being zipped --- are generated concurrently. --- --- @since 0.4.0 -{-# INLINABLE zipAsyncWithM #-} -zipAsyncWithM :: (IsStream t, MonadAsync m) - => (a -> b -> m c) -> t m a -> t m b -> t m c -zipAsyncWithM f m1 m2 = K.mkStream $ \st stp sng yld -> do - ma <- Par.mkParallel (adaptState st) m1 - mb <- Par.mkParallel (adaptState st) m2 - K.foldStream st stp sng yld $ zipWithM f ma mb +zipAsyncWith f = zipAsyncWithM (\a b -> return (f a b)) ------------------------------------------------------------------------------ -- Comparison diff --git a/src/Streamly/Streams/Parallel.hs b/src/Streamly/Streams/Parallel.hs index 728d920ae..85c5d3ee3 100644 --- a/src/Streamly/Streams/Parallel.hs +++ b/src/Streamly/Streams/Parallel.hs @@ -28,13 +28,9 @@ module Streamly.Streams.Parallel , parallelFst , parallelMin , tapAsync - - -- * Function application , mkParallel - , (|$) - , (|&) - , (|$.) - , (|&.) + , applyParallel + , foldParallel ) where @@ -49,7 +45,7 @@ import Control.Monad.Reader.Class (MonadReader(..)) import Control.Monad.State.Class (MonadState(..)) import Control.Monad.Trans.Class (MonadTrans(lift)) import Data.Functor (void) -import Data.IORef (readIORef, writeIORef, newIORef) +import Data.IORef (readIORef, writeIORef) import Data.Maybe (fromJust) #if __GLASGOW_HASKELL__ < 808 import Data.Semigroup (Semigroup(..)) @@ -63,14 +59,10 @@ import Streamly.Streams.Serial (map) import Streamly.Streams.StreamK (IsStream(..), Stream, mkStream, foldStream, foldStreamShared, adapt) -import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS_) -import Streamly.Internal.Data.Time.Clock (Clock(..), getTime) import Streamly.Internal.Data.SVar hiding (handleChildException) import qualified Streamly.Streams.StreamK as K import qualified Streamly.Streams.StreamD as D -import qualified Streamly.Internal.Data.SVar as SVar -import qualified Streamly.Internal.Data.Fold as FL #include "Instances.hs" @@ -133,71 +125,6 @@ runOneLimited st m0 winfo = go m0 single a = sendit a >> (liftIO $ sendStop sv winfo) yieldk a r = sendit a >> go r -------------------------------------------------------------------------------- --- StreamD based worker routines -------------------------------------------------------------------------------- - --- Using StreamD the worker stream producing code can fuse with the code to --- queue output to the SVar giving some perf boost. --- --- Note that StreamD can only be used in limited situations, specifically, we --- cannot implement joinStreamVarPar using this. - -{-# INLINE_NORMAL pushWorkerParD #-} -pushWorkerParD :: MonadAsync m - => State t m a -> SVar t m a -> D.Stream m a -> m () -pushWorkerParD st sv xs = - if svarInspectMode sv - then forkWithDiag - else do - tid <- - case getYieldLimit st of - Nothing -> doFork (work Nothing) - (svarMrun sv) - (SVar.handleChildException sv) - Just _ -> doFork (workLim Nothing) - (svarMrun sv) - (SVar.handleChildException sv) - modifyThread sv tid - - where - - {-# NOINLINE work #-} - work info = (D.runFold (FL.toParallelSVar sv info) xs) - - {-# NOINLINE workLim #-} - workLim info = D.runFold (FL.toParallelSVarLimited sv info) xs - - {-# NOINLINE forkWithDiag #-} - forkWithDiag = do - -- We do not use workerCount in case of ParallelVar but still there is - -- no harm in maintaining it correctly. - liftIO $ atomicModifyIORefCAS_ (workerCount sv) $ \n -> n + 1 - recordMaxWorkers sv - -- This allocation matters when significant number of workers are being - -- sent. We allocate it only when needed. The overhead increases by 4x. - winfo <- - case yieldRateInfo sv of - Nothing -> return Nothing - Just _ -> liftIO $ do - cntRef <- newIORef 0 - t <- getTime Monotonic - lat <- newIORef (0, t) - return $ Just WorkerInfo - { workerYieldMax = 0 - , workerYieldCount = cntRef - , workerLatencyStart = lat - } - tid <- - case getYieldLimit st of - Nothing -> doFork (work winfo) - (svarMrun sv) - (SVar.handleChildException sv) - Just _ -> doFork (workLim winfo) - (svarMrun sv) - (SVar.handleChildException sv) - modifyThread sv tid - ------------------------------------------------------------------------------- -- Consing and appending a stream in parallel style ------------------------------------------------------------------------------- @@ -276,72 +203,38 @@ parallelMin = joinStreamVarPar ParallelVar StopAny -- Convert a stream to parallel ------------------------------------------------------------------------------ -_mkParallelK :: (IsStream t, MonadAsync m) => t m a -> m (t m a) -_mkParallelK m = do - sv <- newParallelVar StopNone defState - pushWorkerPar sv (runOne defState{streamVar = Just sv} $ toStream m) - return $ fromSVar sv - --- We may have to use it in higher order functions like concatMap, so use the --- Normal phase for inlining. -{-# INLINE_NORMAL mkParallel #-} mkParallel :: (IsStream t, MonadAsync m) => State Stream m a -> t m a -> m (t m a) mkParallel st m = do sv <- newParallelVar StopNone defState - pushWorkerParD st sv (D.toStreamD m) + -- pushWorkerPar sv (runOne st{streamVar = Just sv} $ toStream m) + D.toSVarParallel st sv $ D.toStreamD m return $ fromSVar sv ------------------------------------------------------------------------------ -- Stream to stream concurrent function application ------------------------------------------------------------------------------ -{-# INLINE _applyParallelK #-} -_applyParallelK :: (IsStream t, MonadAsync m) - => (t m a -> t m b) -> t m a -> t m b -_applyParallelK f m = mkStream $ \st yld sng stp -> do - sv <- newParallelVar StopNone (adaptState st) - pushWorkerPar sv (runOne st{streamVar = Just sv} (toStream m)) - foldStream st yld sng stp $ f $ fromSVar sv - --- XXX need to implement fromSVar in direct style for fused generation from --- SVar. -{-# INLINE applyParallelD #-} -applyParallelD :: (IsStream t, MonadAsync m) - => (t m a -> t m b) -> t m a -> t m b -applyParallelD f m = mkStream $ \st yld sng stp -> do - sv <- newParallelVar StopNone (adaptState st) - pushWorkerParD (adaptState st) sv (D.toStreamD m) - foldStream st yld sng stp $ f $ fromSVar sv - {-# INLINE applyParallel #-} applyParallel :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b -applyParallel = applyParallelD +applyParallel f m = mkStream $ \st yld sng stp -> do + sv <- newParallelVar StopNone (adaptState st) + -- pushWorkerPar sv (runOne st{streamVar = Just sv} (toStream m)) + D.toSVarParallel (adaptState st) sv $ D.toStreamD m + foldStream st yld sng stp $ f $ fromSVar sv ------------------------------------------------------------------------------ -- Stream runner concurrent function application ------------------------------------------------------------------------------ -{-# INLINE _foldParallelK #-} -_foldParallelK :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b -_foldParallelK f m = do - sv <- newParallelVar StopNone defState - pushWorkerPar sv (runOne defState{streamVar = Just sv} $ toStream m) - f $ fromSVar sv - --- XXX need to implement fromSVar in direct style for fused generation from --- SVar. -{-# INLINE foldParallelD #-} -foldParallelD :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b -foldParallelD f m = do - sv <- newParallelVar StopNone defState - pushWorkerParD defState sv (D.toStreamD m) - f $ fromSVar sv - {-# INLINE foldParallel #-} foldParallel :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b -foldParallel = foldParallelD +foldParallel f m = do + sv <- newParallelVar StopNone defState + -- pushWorkerPar sv (runOne defState{streamVar = Just sv} $ toStream m) + D.toSVarParallel defState sv $ D.toStreamD m + f $ fromSVar sv ------------------------------------------------------------------------------ -- Parallel tee @@ -427,85 +320,6 @@ tapAsync f m = mkStream $ \st yld sng stp -> do (svarMrun sv) (handleChildException sv) foldStreamShared st yld sng stp (teeToSVar sv m) ------------------------------------------------------------------------------- --- Concurrent Application ------------------------------------------------------------------------------- - -infixr 0 |$ -infixr 0 |$. - -infixl 1 |& -infixl 1 |&. - --- | Parallel function application operator for streams; just like the regular --- function application operator '$' except that it is concurrent. The --- following code prints a value every second even though each stage adds a 1 --- second delay. --- --- --- @ --- drain $ --- S.mapM (\\x -> threadDelay 1000000 >> print x) --- |$ S.repeatM (threadDelay 1000000 >> return 1) --- @ --- --- /Concurrent/ --- --- @since 0.3.0 -{-# INLINE (|$) #-} -(|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b -f |$ x = applyParallel f x - --- | Parallel reverse function application operator for streams; just like the --- regular reverse function application operator '&' except that it is --- concurrent. --- --- @ --- drain $ --- S.repeatM (threadDelay 1000000 >> return 1) --- |& S.mapM (\\x -> threadDelay 1000000 >> print x) --- @ --- --- /Concurrent/ --- --- @since 0.3.0 -{-# INLINE (|&) #-} -(|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b -x |& f = f |$ x - --- | Parallel function application operator; applies a @run@ or @fold@ function --- to a stream such that the fold consumer and the stream producer run in --- parallel. A @run@ or @fold@ function reduces the stream to a value in the --- underlying monad. The @.@ at the end of the operator is a mnemonic for --- termination of the stream. --- --- @ --- S.foldlM' (\\_ a -> threadDelay 1000000 >> print a) () --- |$. S.repeatM (threadDelay 1000000 >> return 1) --- @ --- --- /Concurrent/ --- --- @since 0.3.0 -{-# INLINE (|$.) #-} -(|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b -f |$. x = foldParallel f x - --- | Parallel reverse function application operator for applying a run or fold --- functions to a stream. Just like '|$.' except that the operands are reversed. --- --- @ --- S.repeatM (threadDelay 1000000 >> return 1) --- |&. S.foldlM' (\\_ a -> threadDelay 1000000 >> print a) () --- @ --- --- /Concurrent/ --- --- @since 0.3.0 -{-# INLINE (|&.) #-} -(|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b -x |&. f = f |$. x - ------------------------------------------------------------------------------ -- ParallelT ------------------------------------------------------------------------------ diff --git a/src/Streamly/Streams/SVar.hs b/src/Streamly/Streams/SVar.hs index 62d9abdf3..b7a4b786a 100644 --- a/src/Streamly/Streams/SVar.hs +++ b/src/Streamly/Streams/SVar.hs @@ -24,21 +24,12 @@ import Control.Monad.Catch (throwM) import Control.Monad.IO.Class (MonadIO(liftIO)) import Data.IORef (newIORef, readIORef, mkWeakIORef, writeIORef) import Data.Maybe (isNothing) -#if __GLASGOW_HASKELL__ < 808 -import Data.Semigroup ((<>)) -#endif -import System.IO (hPutStrLn, stderr) import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime) import System.Mem (performMajorGC) import Streamly.Internal.Data.SVar import Streamly.Streams.StreamK hiding (reverse) -printSVar :: SVar t m a -> String -> IO () -printSVar sv how = do - svInfo <- dumpSVar sv - hPutStrLn stderr $ "\n" <> how <> "\n" <> svInfo - -- | Pull a stream from an SVar. {-# NOINLINE fromStreamVar #-} fromStreamVar :: MonadAsync m => SVar Stream m a -> Stream m a diff --git a/src/Streamly/Streams/StreamD.hs b/src/Streamly/Streams/StreamD.hs index bb527de7b..3125c4bb2 100644 --- a/src/Streamly/Streams/StreamD.hs +++ b/src/Streamly/Streams/StreamD.hs @@ -95,6 +95,7 @@ module Streamly.Streams.StreamD , fromListM , fromStreamK , fromStreamD + , fromSVar -- * Elimination -- ** General Folds @@ -136,6 +137,7 @@ module Streamly.Streams.StreamD , findM , find , (!!) + , toSVarParallel -- ** Flattening nested streams , concatMapM @@ -260,6 +262,8 @@ module Streamly.Streams.StreamD , indexedR , zipWith , zipWithM + , zipAsyncWith + , zipAsyncWithM -- * Comparisons , eqBy @@ -292,19 +296,25 @@ module Streamly.Streams.StreamD , resumeDecodeUtf8Either , decodeUtf8Arrays , decodeUtf8ArraysLenient + + -- * Concurrent Application + , mkParallel + , applyParallel + , foldParallel ) where -import Control.Exception (Exception, SomeException) -import Control.Monad (void) -import Control.Monad.Catch (MonadCatch) +import Control.Exception (Exception, SomeException, fromException) +import Control.Monad (void, when) +import Control.Monad.Catch (MonadCatch, throwM) import Control.Monad.IO.Class (MonadIO(..)) import Control.Monad.Reader (ReaderT) import Control.Monad.State.Strict (StateT) import Control.Monad.Trans (MonadTrans(lift)) import Data.Bits (shiftR, shiftL, (.|.), (.&.)) import Data.Functor.Identity (Identity(..)) -import Data.Maybe (fromJust, isJust) +import Data.IORef (newIORef, readIORef, mkWeakIORef, writeIORef) +import Data.Maybe (fromJust, isJust, isNothing) import Data.Word (Word32) import Foreign.Ptr (Ptr) import Foreign.Storable (Storable(..)) @@ -315,6 +325,7 @@ import GHC.Ptr (Ptr (..)) import GHC.Types (SPEC(..)) import GHC.Word (Word8(..)) import System.IO.Unsafe (unsafePerformIO) +import System.Mem (performMajorGC) import Prelude hiding (map, mapM, mapM_, repeat, foldr, last, take, filter, takeWhile, drop, dropWhile, all, any, maximum, minimum, elem, @@ -325,18 +336,22 @@ import Prelude import qualified Control.Monad.Catch as MC import qualified Control.Monad.Reader as Reader import qualified Control.Monad.State.Strict as State +import qualified Prelude +import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS_) import Streamly.Internal.Memory.Array.Types (Array(..)) import Streamly.Internal.Data.Fold.Types (Fold(..)) import Streamly.Internal.Data.Pipe.Types (Pipe(..), PipeState(..)) -import Streamly.Internal.Data.SVar (MonadAsync, defState, adaptState) +import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime) import Streamly.Internal.Data.Unfold.Types (Unfold(..)) import Streamly.Internal.Data.Strict (Tuple'(..)) import Streamly.Internal.Data.Stream.StreamD.Type +import Streamly.Internal.Data.SVar import qualified Streamly.Internal.Data.Pipe.Types as Pipe import qualified Streamly.Internal.Memory.Array.Types as A +import qualified Streamly.Internal.Data.Fold as FL import qualified Streamly.Memory.Ring as RB import qualified Streamly.Streams.StreamK as K @@ -648,10 +663,100 @@ fromListM = Stream step toStreamD :: (K.IsStream t, Monad m) => t m a -> Stream m a toStreamD = fromStreamK . K.toStream +------------------------------------------------------------------------------- +-- Generation from SVar +------------------------------------------------------------------------------- + +data FromSVarState t m a = + FromSVarInit + | FromSVarRead (SVar t m a) + | FromSVarLoop (SVar t m a) [ChildEvent a] + | FromSVarDone (SVar t m a) + +{-# INLINE_NORMAL fromSVar #-} +fromSVar :: (MonadAsync m) => SVar t m a -> Stream m a +fromSVar svar = Stream step FromSVarInit + where + + {-# INLINE_LATE step #-} + step _ FromSVarInit = do + ref <- liftIO $ newIORef () + _ <- liftIO $ mkWeakIORef ref hook + -- when this copy of svar gets garbage collected "ref" will get + -- garbage collected and our GC hook will be called. + let sv = svar{svarRef = Just ref} + return $ Skip (FromSVarRead sv) + + where + + {-# NOINLINE hook #-} + hook = do + when (svarInspectMode svar) $ do + r <- liftIO $ readIORef (svarStopTime (svarStats svar)) + when (isNothing r) $ + printSVar svar "SVar Garbage Collected" + cleanupSVar svar + -- If there are any SVars referenced by this SVar a GC will prompt + -- them to be cleaned up quickly. + when (svarInspectMode svar) performMajorGC + + step _ (FromSVarRead sv) = do + list <- readOutputQ sv + -- Reversing the output is important to guarantee that we process the + -- outputs in the same order as they were generated by the constituent + -- streams. + return $ Skip $ FromSVarLoop sv (Prelude.reverse list) + + step _ (FromSVarLoop sv []) = do + done <- postProcess sv + return $ Skip $ if done + then (FromSVarDone sv) + else (FromSVarRead sv) + + step _ (FromSVarLoop sv (ev : es)) = do + case ev of + ChildYield a -> return $ Yield a (FromSVarLoop sv es) + ChildStop tid e -> do + accountThread sv tid + case e of + Nothing -> do + stop <- shouldStop tid + if stop + then do + liftIO (cleanupSVar sv) + return $ Skip (FromSVarDone sv) + else return $ Skip (FromSVarLoop sv es) + Just ex -> + case fromException ex of + Just ThreadAbort -> + return $ Skip (FromSVarLoop sv es) + Nothing -> liftIO (cleanupSVar sv) >> throwM ex + where + + shouldStop tid = + case svarStopStyle sv of + StopNone -> return False + StopAny -> return True + StopBy -> do + sid <- liftIO $ readIORef (svarStopBy sv) + return $ if tid == sid then True else False + + step _ (FromSVarDone sv) = do + when (svarInspectMode sv) $ do + t <- liftIO $ getTime Monotonic + liftIO $ writeIORef (svarStopTime (svarStats sv)) (Just t) + liftIO $ printSVar sv "SVar Done" + return Stop + +------------------------------------------------------------------------------- +-- Hoisting the inner monad +------------------------------------------------------------------------------- + {-# INLINE_NORMAL hoist #-} hoist :: Monad n => (forall x. m x -> n x) -> Stream m a -> Stream n a hoist f (Stream step state) = (Stream step' state) where + {-# INLINE_LATE step' #-} step' gst st = do r <- f $ step (adaptState gst) st return $ case r of @@ -659,7 +764,7 @@ hoist f (Stream step state) = (Stream step' state) Skip s -> Skip s Stop -> Stop -{-# INLINE_NORMAL generally #-} +{-# INLINE generally #-} generally :: Monad m => Stream Identity a -> Stream m a generally = hoist (return . runIdentity) @@ -668,6 +773,7 @@ liftInner :: (Monad m, MonadTrans t, Monad (t m)) => Stream m a -> Stream (t m) a liftInner (Stream step state) = Stream step' state where + {-# INLINE_LATE step' #-} step' gst st = do r <- lift $ step (adaptState gst) st return $ case r of @@ -679,6 +785,7 @@ liftInner (Stream step state) = Stream step' state runReaderT :: Monad m => s -> Stream (ReaderT s m) a -> Stream m a runReaderT sval (Stream step state) = Stream step' state where + {-# INLINE_LATE step' #-} step' gst st = do r <- Reader.runReaderT (step (adaptState gst) st) sval return $ case r of @@ -690,6 +797,7 @@ runReaderT sval (Stream step state) = Stream step' state evalStateT :: Monad m => s -> Stream (StateT s m) a -> Stream m a evalStateT sval (Stream step state) = Stream step' (state, sval) where + {-# INLINE_LATE step' #-} step' gst (st, sv) = do (r, sv') <- State.runStateT (step (adaptState gst) st) sv return $ case r of @@ -701,6 +809,7 @@ evalStateT sval (Stream step state) = Stream step' (state, sval) runStateT :: Monad m => s -> Stream (StateT s m) a -> Stream m (s, a) runStateT sval (Stream step state) = Stream step' (state, sval) where + {-# INLINE_LATE step' #-} step' gst (st, sv) = do (r, sv') <- State.runStateT (step (adaptState gst) st) sv return $ case r of @@ -3426,6 +3535,37 @@ zipWithM f (Stream stepa ta) (Stream stepb tb) = Stream step (ta, tb, Nothing) zipWith :: Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c zipWith f = zipWithM (\a b -> return (f a b)) +-- | Like 'zipWithM' but zips concurrently i.e. both the streams being zipped +-- are generated concurrently. +-- +{-# INLINE_NORMAL zipAsyncWithM #-} +zipAsyncWithM :: MonadAsync m + => (a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c +zipAsyncWithM f m1 m2 = Stream step Nothing + + where + + {-# INLINE_LATE step #-} + step gst Nothing = do + ma <- mkParallel (adaptState gst) m1 + mb <- mkParallel (adaptState gst) m2 + return $ Skip (Just $ zipWithM f ma mb) + + step gst (Just (UnStream step1 st)) = do + r <- step1 gst st + return $ case r of + Yield a s -> Yield a (Just $ Stream step1 s) + Skip s -> Skip (Just $ Stream step1 s) + Stop -> Stop + +-- | Like 'zipWith' but zips concurrently i.e. both the streams being zipped +-- are generated concurrently. +-- +{-# INLINE zipAsyncWith #-} +zipAsyncWith :: MonadAsync m + => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c +zipAsyncWith f = zipAsyncWithM (\a b -> return (f a b)) + ------------------------------------------------------------------------------ -- Merging ------------------------------------------------------------------------------ @@ -3983,3 +4123,106 @@ encodeUtf8 (Stream step state) = Stream step' (EncodeState state WNil) Skip s -> Skip (EncodeState s WNil) Stop -> Stop step' _ (EncodeState s (WCons x xs)) = return $ Yield x (EncodeState s xs) + +------------------------------------------------------------------------------- +-- Concurrent application and fold +------------------------------------------------------------------------------- + +-- Using StreamD the worker stream producing code can fuse with the code to +-- queue output to the SVar giving some perf boost. +-- +-- Note that StreamD can only be used in limited situations, specifically, we +-- cannot implement joinStreamVarPar using this. +-- +-- XXX make sure that the SVar passed is a Parallel style SVar. + +-- | Fold the supplied stream to the SVar asynchronously using Parallel +-- concurrency style. +-- {-# INLINE_NORMAL toSVarParallel #-} +{-# INLINE toSVarParallel #-} +toSVarParallel :: MonadAsync m + => State t m a -> SVar t m a -> Stream m a -> m () +toSVarParallel st sv xs = + if svarInspectMode sv + then forkWithDiag + else do + tid <- + case getYieldLimit st of + Nothing -> doFork (work Nothing) + (svarMrun sv) + (handleChildException sv) + Just _ -> doFork (workLim Nothing) + (svarMrun sv) + (handleChildException sv) + modifyThread sv tid + + where + + {-# NOINLINE work #-} + work info = (runFold (FL.toParallelSVar sv info) xs) + + {-# NOINLINE workLim #-} + workLim info = runFold (FL.toParallelSVarLimited sv info) xs + + {-# NOINLINE forkWithDiag #-} + forkWithDiag = do + -- We do not use workerCount in case of ParallelVar but still there is + -- no harm in maintaining it correctly. + liftIO $ atomicModifyIORefCAS_ (workerCount sv) $ \n -> n + 1 + recordMaxWorkers sv + -- This allocation matters when significant number of workers are being + -- sent. We allocate it only when needed. The overhead increases by 4x. + winfo <- + case yieldRateInfo sv of + Nothing -> return Nothing + Just _ -> liftIO $ do + cntRef <- newIORef 0 + t <- getTime Monotonic + lat <- newIORef (0, t) + return $ Just WorkerInfo + { workerYieldMax = 0 + , workerYieldCount = cntRef + , workerLatencyStart = lat + } + tid <- + case getYieldLimit st of + Nothing -> doFork (work winfo) + (svarMrun sv) + (handleChildException sv) + Just _ -> doFork (workLim winfo) + (svarMrun sv) + (handleChildException sv) + modifyThread sv tid + +{-# INLINE_NORMAL mkParallel #-} +mkParallel :: MonadAsync m => State t m a -> Stream m a -> m (Stream m a) +mkParallel st m = do + sv <- newParallelVar StopNone defState + toSVarParallel st sv m + return $ fromSVar sv + +{-# INLINE_NORMAL applyParallel #-} +applyParallel :: MonadAsync m + => (Stream m a -> Stream m b) -> Stream m a -> Stream m b +applyParallel f m = Stream step Nothing + where + + {-# INLINE_LATE step #-} + step gst Nothing = do + sv <- newParallelVar StopNone (adaptState gst) + toSVarParallel (adaptState gst) sv m + return $ Skip $ Just $ f (fromSVar sv) + + step gst (Just (UnStream step1 st)) = do + r <- step1 gst st + return $ case r of + Yield a s -> Yield a (Just $ Stream step1 s) + Skip s -> Skip (Just $ Stream step1 s) + Stop -> Stop + +{-# INLINE foldParallel #-} +foldParallel :: (K.IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b +foldParallel f m = do + sv <- newParallelVar StopNone defState + toSVarParallel defState sv (toStreamD m) + f $ fromStreamD $ fromSVar sv