Add concurrency control using "threads" and "buffer"

Also exposed the "filterM" API.

Some benchmarks are affected with this. The most affected benchmarks are:

StreamK ops:
before:
serially/generation/foldMapWith          mean 1.940 ms
serially/generation/foldMapWithM         mean 3.891 ms

After:
serially/generation/foldMapWith          mean 2.874 ms
serially/generation/foldMapWithM         mean 5.003 ms

StreamD ops:
zip/zipM are affected
This commit is contained in:
Harendra Kumar 2018-06-29 10:25:55 +05:30
parent 9fe6dc1726
commit 211df792a4
13 changed files with 455 additions and 304 deletions

View File

@ -12,6 +12,8 @@
### Enhancements
* Add concurrency control primitives `threads` and `buffer`.
* Significant performance improvements
* Add `yield` to construct a singleton stream from a pure value
* Add `repeat` to generate an infinite stream by repeating a pure value
* Add `fromList` and `fromListM` to generate streams from lists, faster than
@ -19,7 +21,7 @@
* Add `map` as a synonym of fmap
* Add `scanlM'`, the monadic version of scanl'
* Add `takeWhileM` and `dropWhileM`
* Significant performance improvements
* Add `filterM`
## 0.3.0

View File

@ -100,6 +100,10 @@ module Streamly
, wAsync
, parallel
-- * Concurrency Control
, threads
, buffer
-- * Folding Containers of Streams
-- $foldutils
, foldWith
@ -159,6 +163,7 @@ import Streamly.Streams.Ahead
import Streamly.Streams.Parallel
import Streamly.Streams.Zip
import Streamly.Streams.Prelude
import Streamly.Streams.SVar (threads, buffer)
import Streamly.SVar (MonadAsync)
import Data.Semigroup (Semigroup(..))

View File

@ -119,6 +119,7 @@ module Streamly.Prelude
-- ** Filtering
, filter
, filterM
, take
, takeWhile
, takeWhileM
@ -163,7 +164,7 @@ import Prelude
import qualified Prelude
import qualified System.IO as IO
import Streamly.SVar (MonadAsync)
import Streamly.SVar (MonadAsync, defState, rstState)
import Streamly.Streams.StreamK (IsStream(..))
import Streamly.Streams.Serial (SerialT)
@ -619,7 +620,7 @@ toHandle h m = go (toStream m)
let stop = return ()
single a = liftIO (IO.hPutStrLn h a)
yieldk a r = liftIO (IO.hPutStrLn h a) >> go r
in (K.unStream m1) Nothing stop single yieldk
in (K.unStream m1) defState stop single yieldk
------------------------------------------------------------------------------
-- Transformation by Folding (Scans)
@ -675,6 +676,13 @@ filter :: IsStream t => (a -> Bool) -> t m a -> t m a
filter = K.filter
#endif
-- | Same as 'filter' but with a monadic predicate.
--
-- @since 0.4.0
{-# INLINE filterM #-}
filterM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a
filterM p m = fromStreamD $ D.filterM p $ toStreamD m
-- | Take first 'n' elements from the stream and discard the rest.
--
-- @since 0.1.0
@ -796,12 +804,12 @@ mapMaybeM f = fmap fromJust . filter isJust . mapM f
reverse :: (IsStream t) => t m a -> t m a
reverse m = fromStream $ go K.nil (toStream m)
where
go rev rest = K.Stream $ \_ stp sng yld ->
let runIt x = K.unStream x Nothing stp sng yld
go rev rest = K.Stream $ \st stp sng yld ->
let runIt x = K.unStream x (rstState st) stp sng yld
stop = runIt rev
single a = runIt $ a `K.cons` rev
yieldk a r = runIt $ go (a `K.cons` rev) r
in K.unStream rest Nothing stop single yieldk
in K.unStream rest (rstState st) stop single yieldk
------------------------------------------------------------------------------
-- Zipping

View File

@ -23,6 +23,11 @@ module Streamly.SVar
MonadAsync
, SVar (..)
, SVarStyle (..)
, defaultMaxBuffer
, defaultMaxThreads
, State (..)
, defState
, rstState
, newAheadVar
, newParallelVar
@ -71,6 +76,7 @@ import Data.Functor (void)
import Data.Heap (Heap, Entry(..))
import Data.IORef
(IORef, modifyIORef, newIORef, readIORef, atomicModifyIORef)
import Data.Maybe (fromJust)
import Data.Set (Set)
import GHC.Conc (ThreadId(..))
import GHC.Exts
@ -180,6 +186,29 @@ data SVar t m a =
#endif
}
data State t m a = State
{ streamVar :: Maybe (SVar t m a)
, maxThreads :: Int
, maxBuffer :: Int
}
defaultMaxThreads, defaultMaxBuffer :: Int
defaultMaxThreads = 1500
defaultMaxBuffer = 1500
defState :: State t m a
defState = State
{ streamVar = Nothing
, maxThreads = defaultMaxThreads
, maxBuffer = defaultMaxBuffer
}
-- We can optimize this so that we clear it only if it is a Just value, it
-- results in slightly better perf for zip/zipM but the performance of scan
-- worsens a lot, it does not fuse.
rstState :: State t m a -> State t m b
rstState st = st {streamVar = Nothing}
#ifdef DIAGNOSTICS
{-# NOINLINE dumpSVar #-}
dumpSVar :: SVar t m a -> IO String
@ -306,14 +335,11 @@ doFork action exHandler =
-- TBD Each worker can have their own queue and the consumer can empty one
-- queue at a time, that way contention can be reduced.
maxOutputQLen :: Int
maxOutputQLen = 1500
-- | This function is used by the producer threads to queue output for the
-- consumer thread to consume. Returns whether the queue has more space.
{-# NOINLINE send #-}
send :: SVar t m a -> ChildEvent a -> IO Bool
send sv msg = do
send :: Int -> SVar t m a -> ChildEvent a -> IO Bool
send maxOutputQLen sv msg = do
len <- atomicModifyIORefCAS (outputQueue sv) $ \(es, n) ->
((msg : es, n + 1), n)
when (len <= 0) $ do
@ -327,13 +353,13 @@ send sv msg = do
-- The important point is that the consumer is guaranteed to receive a
-- doorbell if something was added to the queue after it empties it.
void $ tryPutMVar (outputDoorBell sv) ()
return (len < maxOutputQLen)
return (len < maxOutputQLen || maxOutputQLen < 0)
{-# NOINLINE sendStop #-}
sendStop :: SVar t m a -> IO ()
sendStop sv = do
liftIO $ atomicModifyIORefCAS_ (workerCount sv) $ \n -> n - 1
myThreadId >>= \tid -> void $ send sv (ChildStop tid Nothing)
myThreadId >>= \tid -> void $ send (-1) sv (ChildStop tid Nothing)
-------------------------------------------------------------------------------
-- Async
@ -360,17 +386,18 @@ enqueueLIFO sv q m = do
{-# INLINE workLoopLIFO #-}
workLoopLIFO :: MonadIO m
=> (SVar t m a -> IORef [t m a] -> t m a -> m () -> m ())
-> SVar t m a -> IORef [t m a] -> m ()
workLoopLIFO f sv q = run
=> (State t m a -> IORef [t m a] -> t m a -> m () -> m ())
-> State t m a -> IORef [t m a] -> m ()
workLoopLIFO f st q = run
where
sv = fromJust $ streamVar st
run = do
work <- dequeue
case work of
Nothing -> liftIO $ sendStop sv
Just m -> f sv q m run
Just m -> f st q m run
dequeue = liftIO $ atomicModifyIORefCAS q $ \case
[] -> ([], Nothing)
@ -401,18 +428,18 @@ enqueueFIFO sv q m = do
{-# INLINE workLoopFIFO #-}
workLoopFIFO :: MonadIO m
=> (SVar t m a -> LinkedQueue (t m a) -> t m a -> m () -> m ())
-> SVar t m a -> LinkedQueue (t m a) -> m ()
workLoopFIFO f sv q = run
=> (State t m a -> LinkedQueue (t m a) -> t m a -> m () -> m ())
-> State t m a -> LinkedQueue (t m a) -> m ()
workLoopFIFO f st q = run
where
sv = fromJust $ streamVar st
run = do
work <- liftIO $ tryPopR q
case work of
Nothing -> liftIO $ sendStop sv
-- Just m -> runStreamFIFO sv q m run
Just m -> f sv q m run
Just m -> f st q m run
-------------------------------------------------------------------------------
-- Ahead
@ -596,7 +623,7 @@ allThreadsDone sv = liftIO $ S.null <$> readIORef (workerThreads sv)
handleChildException :: SVar t m a -> SomeException -> IO ()
handleChildException sv e = do
tid <- myThreadId
void $ send sv (ChildStop tid (Just e))
void $ send (-1) sv (ChildStop tid (Just e))
#ifdef DIAGNOSTICS
recordMaxWorkers :: MonadIO m => SVar t m a -> m ()
@ -635,11 +662,8 @@ pushWorkerPar sv wloop = do
#endif
doFork wloop (handleChildException sv) >>= modifyThread sv
maxWorkerLimit :: Int
maxWorkerLimit = 1500
dispatchWorker :: MonadAsync m => SVar t m a -> m ()
dispatchWorker sv = do
dispatchWorker :: MonadAsync m => Int -> SVar t m a -> m ()
dispatchWorker maxWorkerLimit sv = do
done <- liftIO $ isWorkDone sv
when (not done) $ do
-- Note that the worker count is only decremented during event
@ -651,11 +675,11 @@ dispatchWorker sv = do
-- executing. In that case we should either configure the maxWorker
-- count to higher or use parallel style instead of ahead or async
-- style.
when (cnt < maxWorkerLimit) $ pushWorker sv
when (cnt < maxWorkerLimit || maxWorkerLimit < 0) $ pushWorker sv
{-# NOINLINE sendWorkerWait #-}
sendWorkerWait :: MonadAsync m => SVar t m a -> m ()
sendWorkerWait sv = do
sendWorkerWait :: MonadAsync m => Int -> SVar t m a -> m ()
sendWorkerWait maxWorkerLimit sv = do
-- Note that we are guaranteed to have at least one outstanding worker when
-- we enter this function. So if we sleep we are guaranteed to be woken up
-- by a outputDoorBell, when the worker exits.
@ -707,7 +731,7 @@ sendWorkerWait sv = do
liftIO $ atomicModifyIORefCAS_ (needDoorBell sv) $ const True
liftIO $ storeLoadBarrier
dispatchWorker sv
dispatchWorker maxWorkerLimit sv
-- XXX test for the case when we miss sending a worker when the worker
-- count is more than 1500.
@ -722,8 +746,8 @@ sendWorkerWait sv = do
liftIO $ withDBGMVar sv "sendWorkerWait: nothing to do"
$ takeMVar (outputDoorBell sv)
(_, len) <- liftIO $ readIORef (outputQueue sv)
when (len <= 0) $ sendWorkerWait sv
else sendWorkerWait sv
when (len <= 0) $ sendWorkerWait maxWorkerLimit sv
else sendWorkerWait maxWorkerLimit sv
{-# INLINE readOutputQRaw #-}
readOutputQRaw :: SVar t m a -> IO ([ChildEvent a], Int)
@ -735,8 +759,8 @@ readOutputQRaw sv = do
#endif
return (list, len)
readOutputQBounded :: MonadAsync m => SVar t m a -> m [ChildEvent a]
readOutputQBounded sv = do
readOutputQBounded :: MonadAsync m => Int -> SVar t m a -> m [ChildEvent a]
readOutputQBounded n sv = do
(list, len) <- liftIO $ readOutputQRaw sv
-- When there is no output seen we dispatch more workers to help
-- out if there is work pending in the work queue.
@ -760,7 +784,7 @@ readOutputQBounded sv = do
{-# INLINE blockingRead #-}
blockingRead = do
sendWorkerWait sv
sendWorkerWait n sv
liftIO $ (readOutputQRaw sv >>= return . fst)
postProcessBounded :: MonadAsync m => SVar t m a -> m Bool
@ -778,12 +802,13 @@ postProcessBounded sv = do
else return False
getAheadSVar :: MonadAsync m
=> ( SVar t m a
=> State t m a
-> ( State t m a
-> IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Int)
-> m ())
-> IO (SVar t m a)
getAheadSVar f = do
getAheadSVar st f = do
outQ <- newIORef ([], 0)
outH <- newIORef (H.empty, 0)
outQMv <- newEmptyMVar
@ -802,11 +827,11 @@ getAheadSVar f = do
let sv =
SVar { outputQueue = outQ
, outputDoorBell = outQMv
, readOutputQ = readOutputQBounded sv
, readOutputQ = readOutputQBounded (maxThreads st) sv
, postProcess = postProcessBounded sv
, workerThreads = running
-- , workLoop = workLoopAhead sv q outH
, workLoop = f sv q outH
, workLoop = f st{streamVar = Just sv} q outH
, enqueue = enqueueAhead sv q
, isWorkDone = isWorkDoneAhead q outH
, needDoorBell = wfw
@ -894,14 +919,15 @@ sendWorker sv m = do
{-# INLINABLE newAheadVar #-}
newAheadVar :: MonadAsync m
=> t m a
-> ( SVar t m a
=> State t m a
-> t m a
-> ( State t m a
-> IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Int)
-> m ())
-> m (SVar t m a)
newAheadVar m wloop = do
sv <- liftIO $ getAheadSVar wloop
newAheadVar st m wloop = do
sv <- liftIO $ getAheadSVar st wloop
sendWorker sv m
{-# INLINABLE newParallelVar #-}

View File

@ -115,14 +115,16 @@ import Prelude hiding (map)
-- Thererefore the queue never has more than on item in it.
workLoopAhead :: MonadIO m
=> SVar Stream m a
=> State Stream m a
-> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)) , Int)
-> m ()
workLoopAhead sv q heap = runHeap
workLoopAhead st q heap = runHeap
where
sv = fromJust $ streamVar st
maxBuf = maxBuffer st
toHeap seqNo ent = do
hp <- liftIO $ atomicModifyIORefCAS heap $ \(h, snum) ->
((H.insert (Entry seqNo ent) h, snum), h)
@ -134,7 +136,7 @@ workLoopAhead sv q heap = runHeap
yieldToHeap seqNo a r = toHeap seqNo (AheadEntryStream (a `K.cons` r))
singleOutput seqNo a = do
continue <- liftIO $ send sv (ChildYield a)
continue <- liftIO $ send maxBuf sv (ChildYield a)
if continue
then runQueueToken seqNo
else liftIO $ do
@ -142,11 +144,11 @@ workLoopAhead sv q heap = runHeap
sendStop sv
yieldOutput seqNo a r = do
continue <- liftIO $ send sv (ChildYield a)
continue <- liftIO $ send maxBuf sv (ChildYield a)
if continue
then unStream r (Just sv) (runQueueToken seqNo)
(singleOutput seqNo)
(yieldOutput seqNo)
then unStream r st (runQueueToken seqNo)
(singleOutput seqNo)
(yieldOutput seqNo)
else liftIO $ do
atomicModifyIORefCAS_ heap $ \(h, _) ->
(H.insert (Entry seqNo (AheadEntryStream r)) h, seqNo)
@ -163,15 +165,15 @@ workLoopAhead sv q heap = runHeap
Just (m, seqNo) -> do
if seqNo == prevSeqNo + 1
then
unStream m (Just sv) (runQueueToken seqNo)
(singleOutput seqNo)
(yieldOutput seqNo)
unStream m st (runQueueToken seqNo)
(singleOutput seqNo)
(yieldOutput seqNo)
else do
liftIO $ atomicModifyIORefCAS_ heap $ \(h, _) ->
(h, prevSeqNo + 1)
unStream m (Just sv) runHeap
(singleToHeap seqNo)
(yieldToHeap seqNo)
unStream m st runHeap
(singleToHeap seqNo)
(yieldToHeap seqNo)
runQueueNoToken = do
work <- dequeueAhead q
case work of
@ -179,13 +181,13 @@ workLoopAhead sv q heap = runHeap
Just (m, seqNo) -> do
if seqNo == 0
then
unStream m (Just sv) (runQueueToken seqNo)
(singleOutput seqNo)
(yieldOutput seqNo)
unStream m st (runQueueToken seqNo)
(singleOutput seqNo)
(yieldOutput seqNo)
else
unStream m (Just sv) runHeap
(singleToHeap seqNo)
(yieldToHeap seqNo)
unStream m st runHeap
(singleToHeap seqNo)
(yieldToHeap seqNo)
{-# NOINLINE runHeap #-}
runHeap = do
@ -206,9 +208,9 @@ workLoopAhead sv q heap = runHeap
case hent of
AheadEntryPure a -> singleOutput seqNo a
AheadEntryStream r ->
unStream r (Just sv) (runQueueToken seqNo)
(singleOutput seqNo)
(yieldOutput seqNo)
unStream r st (runQueueToken seqNo)
(singleOutput seqNo)
(yieldOutput seqNo)
-------------------------------------------------------------------------------
-- WAhead
@ -221,25 +223,25 @@ workLoopAhead sv q heap = runHeap
-- The only difference between forkSVarAsync and this is that we run the left
-- computation without a shared SVar.
forkSVarAhead :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
forkSVarAhead m1 m2 = Stream $ \_ stp sng yld -> do
sv <- newAheadVar (concurrently m1 m2) workLoopAhead
unStream (fromSVar sv) Nothing stp sng yld
forkSVarAhead m1 m2 = Stream $ \st stp sng yld -> do
sv <- newAheadVar st (concurrently m1 m2) workLoopAhead
unStream (fromSVar sv) (rstState st) stp sng yld
where
concurrently ma mb = Stream $ \svr stp sng yld -> do
liftIO $ enqueue (fromJust svr) mb
unStream ma Nothing stp sng yld
concurrently ma mb = Stream $ \st stp sng yld -> do
liftIO $ enqueue (fromJust $ streamVar st) mb
unStream ma (rstState st) stp sng yld
{-# INLINE aheadS #-}
aheadS :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
aheadS m1 m2 = Stream $ \svr stp sng yld -> do
case svr of
aheadS m1 m2 = Stream $ \st stp sng yld -> do
case streamVar st of
Just sv | svarStyle sv == AheadVar -> do
liftIO $ enqueue sv m2
-- Always run the left side on a new SVar to avoid complexity in
-- sequencing results. This means the left side cannot further
-- split into more ahead computations on the same SVar.
unStream m1 Nothing stp sng yld
_ -> unStream (forkSVarAhead m1 m2) Nothing stp sng yld
unStream m1 (rstState st) stp sng yld
_ -> unStream (forkSVarAhead m1 m2) (rstState st) stp sng yld
-- | XXX we can implement it more efficienty by directly implementing instead
-- of combining streams using ahead.
@ -358,11 +360,11 @@ aheadbind
aheadbind m f = go m
where
go (Stream g) =
Stream $ \ctx stp sng yld ->
let run x = unStream x ctx stp sng yld
Stream $ \st stp sng yld ->
let run x = unStream x st stp sng yld
single a = run $ f a
yieldk a r = run $ f a `aheadS` go r
in g Nothing stp single yieldk
in g (rstState st) stp single yieldk
instance MonadAsync m => Monad (AheadT m) where
return = pure

View File

@ -26,6 +26,7 @@ module Streamly.Streams.Async
, async
, (<|) --deprecated
, mkAsync
, mkAsync'
, WAsyncT
, WAsync
@ -65,16 +66,18 @@ import qualified Streamly.Streams.StreamK as K
{-# INLINE runStreamLIFO #-}
runStreamLIFO :: MonadIO m
=> SVar Stream m a -> IORef [Stream m a] -> Stream m a -> m () -> m ()
runStreamLIFO sv q m stop = unStream m (Just sv) stop single yieldk
=> State Stream m a -> IORef [Stream m a] -> Stream m a -> m () -> m ()
runStreamLIFO st q m stop = unStream m st stop single yieldk
where
sv = fromJust $ streamVar st
maxBuf = maxBuffer st
single a = do
res <- liftIO $ send sv (ChildYield a)
res <- liftIO $ send maxBuf sv (ChildYield a)
if res then stop else liftIO $ sendStop sv
yieldk a r = do
res <- liftIO $ send sv (ChildYield a)
res <- liftIO $ send maxBuf sv (ChildYield a)
if res
then (unStream r) (Just sv) stop single yieldk
then (unStream r) st stop single yieldk
else liftIO $ enqueueLIFO sv q r >> sendStop sv
-------------------------------------------------------------------------------
@ -82,15 +85,22 @@ runStreamLIFO sv q m stop = unStream m (Just sv) stop single yieldk
-------------------------------------------------------------------------------
{-# INLINE runStreamFIFO #-}
runStreamFIFO :: MonadIO m
=> SVar Stream m a -> LinkedQueue (Stream m a) -> Stream m a -> m () -> m ()
runStreamFIFO sv q m stop = unStream m (Just sv) stop single yieldk
runStreamFIFO
:: MonadIO m
=> State Stream m a
-> LinkedQueue (Stream m a)
-> Stream m a
-> m ()
-> m ()
runStreamFIFO st q m stop = unStream m st stop single yieldk
where
sv = fromJust $ streamVar st
maxBuf = maxBuffer st
single a = do
res <- liftIO $ send sv (ChildYield a)
res <- liftIO $ send maxBuf sv (ChildYield a)
if res then stop else liftIO $ sendStop sv
yieldk a r = do
res <- liftIO $ send sv (ChildYield a)
res <- liftIO $ send maxBuf sv (ChildYield a)
liftIO (enqueueFIFO sv q r)
if res then stop else liftIO $ sendStop sv
@ -103,8 +113,8 @@ runStreamFIFO sv q m stop = unStream m (Just sv) stop single yieldk
-- function argument to this function results in a perf degradation of more
-- than 10%. Need to investigate what the root cause is.
-- Interestingly, the same thing does not make any difference for Ahead.
getLifoSVar :: MonadAsync m => IO (SVar Stream m a)
getLifoSVar = do
getLifoSVar :: MonadAsync m => State Stream m a -> IO (SVar Stream m a)
getLifoSVar st = do
outQ <- newIORef ([], 0)
outQMv <- newEmptyMVar
active <- newIORef 0
@ -122,10 +132,11 @@ getLifoSVar = do
let sv =
SVar { outputQueue = outQ
, outputDoorBell = outQMv
, readOutputQ = readOutputQBounded sv
, readOutputQ = readOutputQBounded (maxThreads st) sv
, postProcess = postProcessBounded sv
, workerThreads = running
, workLoop = workLoopLIFO runStreamLIFO sv q
, workLoop = workLoopLIFO runStreamLIFO
st{streamVar = Just sv} q
, enqueue = enqueueLIFO sv q
, isWorkDone = checkEmpty
, needDoorBell = wfw
@ -144,8 +155,8 @@ getLifoSVar = do
}
in return sv
getFifoSVar :: MonadAsync m => IO (SVar Stream m a)
getFifoSVar = do
getFifoSVar :: MonadAsync m => State Stream m a -> IO (SVar Stream m a)
getFifoSVar st = do
outQ <- newIORef ([], 0)
outQMv <- newEmptyMVar
active <- newIORef 0
@ -162,10 +173,11 @@ getFifoSVar = do
let sv =
SVar { outputQueue = outQ
, outputDoorBell = outQMv
, readOutputQ = readOutputQBounded sv
, readOutputQ = readOutputQBounded (maxThreads st) sv
, postProcess = postProcessBounded sv
, workerThreads = running
, workLoop = workLoopFIFO runStreamFIFO sv q
, workLoop = workLoopFIFO runStreamFIFO
st{streamVar = Just sv} q
, enqueue = enqueueFIFO sv q
, isWorkDone = nullQ q
, needDoorBell = wfw
@ -185,9 +197,10 @@ getFifoSVar = do
in return sv
{-# INLINABLE newAsyncVar #-}
newAsyncVar :: MonadAsync m => Stream m a -> m (SVar Stream m a)
newAsyncVar m = do
sv <- liftIO getLifoSVar
newAsyncVar :: MonadAsync m
=> State Stream m a -> Stream m a -> m (SVar Stream m a)
newAsyncVar st m = do
sv <- liftIO $ getLifoSVar st
sendWorker sv m
-- XXX Get rid of this?
@ -200,13 +213,18 @@ newAsyncVar m = do
-- @since 0.2.0
{-# INLINABLE mkAsync #-}
mkAsync :: (IsStream t, MonadAsync m) => t m a -> m (t m a)
mkAsync m = newAsyncVar (toStream m) >>= return . fromSVar
mkAsync m = newAsyncVar defState (toStream m) >>= return . fromSVar
{-# INLINABLE mkAsync' #-}
mkAsync' :: (IsStream t, MonadAsync m) => State Stream m a -> t m a -> m (t m a)
mkAsync' st m = newAsyncVar st (toStream m) >>= return . fromSVar
-- | Create a new SVar and enqueue one stream computation on it.
{-# INLINABLE newWAsyncVar #-}
newWAsyncVar :: MonadAsync m => Stream m a -> m (SVar Stream m a)
newWAsyncVar m = do
sv <- liftIO getFifoSVar
newWAsyncVar :: MonadAsync m
=> State Stream m a -> Stream m a -> m (SVar Stream m a)
newWAsyncVar st m = do
sv <- liftIO $ getFifoSVar st
sendWorker sv m
------------------------------------------------------------------------------
@ -275,25 +293,25 @@ newWAsyncVar m = do
forkSVarAsync :: MonadAsync m
=> SVarStyle -> Stream m a -> Stream m a -> Stream m a
forkSVarAsync style m1 m2 = Stream $ \_ stp sng yld -> do
forkSVarAsync style m1 m2 = Stream $ \st stp sng yld -> do
sv <- case style of
AsyncVar -> newAsyncVar (concurrently m1 m2)
WAsyncVar -> newWAsyncVar (concurrently m1 m2)
AsyncVar -> newAsyncVar st (concurrently m1 m2)
WAsyncVar -> newWAsyncVar st (concurrently m1 m2)
_ -> error "illegal svar type"
unStream (fromSVar sv) Nothing stp sng yld
unStream (fromSVar sv) (rstState st) stp sng yld
where
concurrently ma mb = Stream $ \svr stp sng yld -> do
liftIO $ enqueue (fromJust svr) mb
unStream ma svr stp sng yld
concurrently ma mb = Stream $ \st stp sng yld -> do
liftIO $ enqueue (fromJust $ streamVar st) mb
unStream ma st stp sng yld
{-# INLINE joinStreamVarAsync #-}
joinStreamVarAsync :: MonadAsync m
=> SVarStyle -> Stream m a -> Stream m a -> Stream m a
joinStreamVarAsync style m1 m2 = Stream $ \svr stp sng yld ->
case svr of
joinStreamVarAsync style m1 m2 = Stream $ \st stp sng yld ->
case streamVar st of
Just sv | svarStyle sv == style ->
liftIO (enqueue sv m2) >> unStream m1 svr stp sng yld
_ -> unStream (forkSVarAsync style m1 m2) Nothing stp sng yld
liftIO (enqueue sv m2) >> unStream m1 st stp sng yld
_ -> unStream (forkSVarAsync style m1 m2) (rstState st) stp sng yld
------------------------------------------------------------------------------
-- Semigroup and Monoid style compositions for parallel actions

View File

@ -43,6 +43,7 @@ import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.Functor (void)
import Data.Maybe (fromJust)
import Data.Semigroup (Semigroup(..))
import Prelude hiding (map)
@ -59,36 +60,38 @@ import qualified Streamly.Streams.StreamK as K
-------------------------------------------------------------------------------
{-# NOINLINE runOne #-}
runOne :: MonadIO m => SVar Stream m a -> Stream m a -> m ()
runOne sv m = (unStream m) (Just sv) stop single yieldk
runOne :: MonadIO m => State Stream m a -> Stream m a -> m ()
runOne st m = unStream m st stop single yieldk
where
sv = fromJust $ streamVar st
stop = liftIO $ sendStop sv
sendit a = liftIO $ send sv (ChildYield a)
sendit a = liftIO $ send (-1) sv (ChildYield a)
single a = sendit a >> stop
-- XXX there is no flow control in parallel case. We should perhaps use a
-- queue and queue it back on that and exit the thread when the outputQueue
-- overflows. Parallel is dangerous because it can accumulate unbounded
-- output in the buffer.
yieldk a r = void (sendit a) >> runOne sv r
yieldk a r = void (sendit a) >> runOne st r
{-# NOINLINE forkSVarPar #-}
forkSVarPar :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
forkSVarPar m r = Stream $ \_ stp sng yld -> do
forkSVarPar m r = Stream $ \st stp sng yld -> do
sv <- newParallelVar
pushWorkerPar sv (runOne sv m)
pushWorkerPar sv (runOne sv r)
(unStream (fromSVar sv)) Nothing stp sng yld
pushWorkerPar sv (runOne st{streamVar = Just sv} m)
pushWorkerPar sv (runOne st{streamVar = Just sv} r)
(unStream (fromSVar sv)) (rstState st) stp sng yld
{-# INLINE joinStreamVarPar #-}
joinStreamVarPar :: MonadAsync m
=> SVarStyle -> Stream m a -> Stream m a -> Stream m a
joinStreamVarPar style m1 m2 = Stream $ \svr stp sng yld ->
case svr of
joinStreamVarPar style m1 m2 = Stream $ \st stp sng yld ->
case streamVar st of
Just sv | svarStyle sv == style -> do
pushWorkerPar sv (runOne sv m1) >> (unStream m2) svr stp sng yld
_ -> unStream (forkSVarPar m1 m2) Nothing stp sng yld
pushWorkerPar sv (runOne st m1)
unStream m2 st stp sng yld
_ -> unStream (forkSVarPar m1 m2) (rstState st) stp sng yld
{-# INLINE parallelStream #-}
parallelStream :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
@ -115,7 +118,7 @@ parallel m1 m2 = fromStream $ parallelStream (toStream m1) (toStream m2)
mkParallel :: (IsStream t, MonadAsync m) => t m a -> m (t m a)
mkParallel m = do
sv <- newParallelVar
pushWorkerPar sv (runOne sv $ toStream m)
pushWorkerPar sv (runOne defState{streamVar = Just sv} $ toStream m)
return $ fromSVar sv
------------------------------------------------------------------------------
@ -124,10 +127,10 @@ mkParallel m = do
{-# INLINE applyWith #-}
applyWith :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b
applyWith f m = fromStream $ Stream $ \svr stp sng yld -> do
applyWith f m = fromStream $ Stream $ \st stp sng yld -> do
sv <- newParallelVar
pushWorkerPar sv (runOne sv (toStream m))
unStream (toStream $ f $ fromSVar sv) svr stp sng yld
pushWorkerPar sv (runOne st{streamVar = Just sv} (toStream m))
unStream (toStream $ f $ fromSVar sv) st stp sng yld
------------------------------------------------------------------------------
-- Stream runner concurrent function application
@ -137,7 +140,7 @@ applyWith f m = fromStream $ Stream $ \svr stp sng yld -> do
runWith :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b
runWith f m = do
sv <- newParallelVar
pushWorkerPar sv (runOne sv $ toStream m)
pushWorkerPar sv (runOne defState{streamVar = Just sv} $ toStream m)
f $ fromSVar sv
------------------------------------------------------------------------------

View File

@ -23,6 +23,8 @@ module Streamly.Streams.SVar
(
fromSVar
, toSVar
, threads
, buffer
)
where
@ -39,12 +41,12 @@ import Streamly.Streams.StreamK
-- | Pull a stream from an SVar.
{-# NOINLINE fromStreamVar #-}
fromStreamVar :: MonadAsync m => SVar Stream m a -> Stream m a
fromStreamVar sv = Stream $ \_ stp sng yld -> do
fromStreamVar sv = Stream $ \st stp sng yld -> do
list <- readOutputQ sv
-- Reversing the output is important to guarantee that we process the
-- outputs in the same order as they were generated by the constituent
-- streams.
unStream (processEvents $ reverse list) Nothing stp sng yld
unStream (processEvents $ reverse list) (rstState st) stp sng yld
where
@ -58,20 +60,20 @@ fromStreamVar sv = Stream $ \_ stp sng yld -> do
stp
{-# INLINE processEvents #-}
processEvents [] = Stream $ \_ stp sng yld -> do
processEvents [] = Stream $ \st stp sng yld -> do
done <- postProcess sv
if done
then allDone stp
else unStream (fromStreamVar sv) Nothing stp sng yld
else unStream (fromStreamVar sv) (rstState st) stp sng yld
processEvents (ev : es) = Stream $ \_ stp sng yld -> do
processEvents (ev : es) = Stream $ \st stp sng yld -> do
let rest = processEvents es
case ev of
ChildYield a -> yld a rest
ChildStop tid e -> do
accountThread sv tid
case e of
Nothing -> unStream rest Nothing stp sng yld
Nothing -> unStream rest (rstState st) stp sng yld
Just ex -> throwM ex
{-# INLINE fromSVar #-}
@ -82,3 +84,37 @@ fromSVar sv = fromStream $ fromStreamVar sv
-- be read back from the SVar using 'fromSVar'.
toSVar :: (IsStream t, MonadAsync m) => SVar Stream m a -> t m a -> m ()
toSVar sv m = toStreamVar sv (toStream m)
-- | Specify the maximum number of threads that can be spawned concurrently
-- when using concurrent streams. This is not the grand total number of threads
-- but maximum threads at each point of concurrency.
-- A value of 0 resets the thread limit to default, a negative value means
-- there is no limit. Note that this primitive has no effect on 'Parallel'
-- streams, the number of threads for 'Parallel' streams are always unbounded.
--
-- This primitive can be used at any point in the composition, and it affects
-- only the enclosed stream. When nested primitives are used the nearest
-- enclosing primitive overrides the outer ones.
-- Note that the use of this primitive does not enable concurrency, to enable
-- concurrency you have to use one of the concurrent stream type combinators.
--
-- @since 0.4.0
threads :: IsStream t => Int -> t m a -> t m a
threads n m = fromStream $ Stream $ \st stp sng yld -> do
let n' = if n == 0 then defaultMaxThreads else n
unStream (toStream m) (st {maxThreads = n'}) stp sng yld
-- | Specify the maximum size of the buffer for storing the results from
-- concurrent computations. If the buffer becomes full we stop spawning more
-- concurrent tasks until there is space in the buffer.
-- A value of 0 resets the buffer size to default, a negative value means
-- there is no limit. Note that this primitive has no effect on 'Parallel'
-- streams, the buffer size for 'Parallel' streams is always unbounded.
--
-- The same scoping rules apply as for the 'threads' primitive.
--
-- @since 0.4.0
buffer :: IsStream t => Int -> t m a -> t m a
buffer n m = fromStream $ Stream $ \st stp sng yld -> do
let n' = if n == 0 then defaultMaxBuffer else n
unStream (toStream m) (st {maxBuffer = n'}) stp sng yld

View File

@ -53,6 +53,7 @@ import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.Semigroup (Semigroup(..))
import Prelude hiding (map, mapM)
import Streamly.SVar (rstState)
import Streamly.Streams.StreamK (IsStream(..), adapt, Stream(..))
import qualified Streamly.Streams.StreamK as K
import qualified Streamly.Streams.StreamD as D
@ -173,11 +174,11 @@ serial m1 m2 = fromStream $ K.serial (toStream m1) (toStream m2)
instance Monad m => Monad (SerialT m) where
return = pure
(SerialT (Stream m)) >>= f = SerialT $ Stream $ \_ stp sng yld ->
let run x = (unStream x) Nothing stp sng yld
(SerialT (Stream m)) >>= f = SerialT $ Stream $ \st stp sng yld ->
let run x = (unStream x) (rstState st) stp sng yld
single a = run $ toStream (f a)
yieldk a r = run $ toStream $ f a <> (fromStream r >>= f)
in m Nothing stp single yieldk
in m (rstState st) stp single yieldk
------------------------------------------------------------------------------
-- Other instances
@ -282,11 +283,11 @@ instance IsStream WSerialT where
{-# INLINE interleave #-}
interleave :: Stream m a -> Stream m a -> Stream m a
interleave m1 m2 = Stream $ \_ stp sng yld -> do
let stop = (unStream m2) Nothing stp sng yld
interleave m1 m2 = Stream $ \st stp sng yld -> do
let stop = (unStream m2) (rstState st) stp sng yld
single a = yld a m2
yieldk a r = yld a (interleave m2 r)
(unStream m1) Nothing stop single yieldk
(unStream m1) (rstState st) stop single yieldk
-- | Polymorphic version of the 'Semigroup' operation '<>' of 'WSerialT'.
-- Interleaves two streams, yielding one element from each stream alternately.
@ -323,11 +324,11 @@ instance Monoid (WSerialT m a) where
instance Monad m => Monad (WSerialT m) where
return = pure
(WSerialT (Stream m)) >>= f = WSerialT $ Stream $ \_ stp sng yld ->
let run x = (unStream x) Nothing stp sng yld
(WSerialT (Stream m)) >>= f = WSerialT $ Stream $ \st stp sng yld ->
let run x = (unStream x) (rstState st) stp sng yld
single a = run $ toStream (f a)
yieldk a r = run $ toStream $ f a <> (fromStream r >>= f)
in m Nothing stp single yieldk
in m (rstState st) stp single yieldk
------------------------------------------------------------------------------
-- Other instances

View File

@ -128,7 +128,7 @@ import Prelude
takeWhile, drop, dropWhile, all, any, maximum, minimum, elem,
notElem, null, head, tail, zipWith)
import Streamly.SVar (MonadAsync)
import Streamly.SVar (MonadAsync, State(..), defState, rstState)
import qualified Streamly.Streams.StreamK as K
------------------------------------------------------------------------------
@ -145,9 +145,10 @@ instance Functor (Step s) where
fmap f (Yield x s) = Yield (f x) s
fmap _ Stop = Stop
-- gst = global state
-- | A stream consists of a step function that generates the next step given a
-- current state, and the current state.
data Stream m a = forall s. Stream (s -> m (Step s a)) s
data Stream m a = forall s. Stream (State K.Stream m a -> s -> m (Step s a)) s
------------------------------------------------------------------------------
-- Construction
@ -156,15 +157,15 @@ data Stream m a = forall s. Stream (s -> m (Step s a)) s
-- | An empty 'Stream'.
{-# INLINE_NORMAL nil #-}
nil :: Monad m => Stream m a
nil = Stream (const $ return Stop) ()
nil = Stream (\_ _ -> return Stop) ()
-- | Can fuse but has O(n^2) complexity.
cons :: Monad m => a -> Stream m a -> Stream m a
cons x (Stream step state) = Stream step1 Nothing
where
step1 Nothing = return $ Yield x (Just state)
step1 (Just st) = do
r <- step st
step1 _ Nothing = return $ Yield x (Just state)
step1 gst (Just st) = do
r <- step (rstState gst) st
case r of
Yield a s -> return $ Yield a (Just s)
Stop -> return Stop
@ -179,7 +180,7 @@ uncons :: Monad m => Stream m a -> m (Maybe (a, Stream m a))
uncons (Stream step state) = go state
where
go st = do
r <- step st
r <- step defState st
return $ case r of
Yield x s -> Just (x, (Stream step s))
Stop -> Nothing
@ -193,7 +194,7 @@ unfoldrM :: Monad m => (s -> m (Maybe (a, s))) -> s -> Stream m a
unfoldrM next state = Stream step state
where
{-# INLINE_LATE step #-}
step st = do
step _ st = do
r <- next st
return $ case r of
Just (x, s) -> Yield x s
@ -208,7 +209,7 @@ unfoldr f = unfoldrM (return . f)
------------------------------------------------------------------------------
repeat :: Monad m => a -> Stream m a
repeat x = Stream (const $ return $ Yield x ()) ()
repeat x = Stream (\_ _ -> return $ Yield x ()) ()
{-# INLINE_NORMAL enumFromStepN #-}
enumFromStepN :: (Num a, Monad m) => a -> a -> Int -> Stream m a
@ -216,7 +217,7 @@ enumFromStepN from stride n =
from `seq` stride `seq` n `seq` Stream step (from, n)
where
{-# INLINE_LATE step #-}
step (x, i) | i > 0 = return $ Yield x (x + stride, i - 1)
step _ (x, i) | i > 0 = return $ Yield x (x + stride, i - 1)
| otherwise = return $ Stop
-------------------------------------------------------------------------------
@ -226,11 +227,11 @@ enumFromStepN from stride n =
-- | Create a singleton 'Stream' from a pure value.
{-# INLINE_NORMAL yield #-}
yield :: Monad m => a -> Stream m a
yield x = Stream (return . step) True
yield x = Stream (\_ s -> return $ step undefined s) True
where
{-# INLINE_LATE step #-}
step True = Yield x False
step False = Stop
step _ True = Yield x False
step _ False = Stop
-- | Create a singleton 'Stream' from a monadic action.
{-# INLINE_NORMAL yieldM #-}
@ -238,8 +239,8 @@ yieldM :: Monad m => m a -> Stream m a
yieldM m = Stream step True
where
{-# INLINE_LATE step #-}
step True = m >>= \x -> return $ Yield x False
step False = return Stop
step _ True = m >>= \x -> return $ Yield x False
step _ False = return Stop
-- XXX we need the MonadAsync constraint because of a rewrite rule.
-- | Convert a list of monadic actions to a 'Stream'
@ -248,8 +249,8 @@ fromListM :: MonadAsync m => [m a] -> Stream m a
fromListM zs = Stream step zs
where
{-# INLINE_LATE step #-}
step (m:ms) = m >>= \x -> return $ Yield x ms
step [] = return Stop
step _ (m:ms) = m >>= \x -> return $ Yield x ms
step _ [] = return Stop
-- | Convert a list of pure values to a 'Stream'
{-# INLINE_LATE fromList #-}
@ -257,18 +258,19 @@ fromList :: Monad m => [a] -> Stream m a
fromList zs = Stream step zs
where
{-# INLINE_LATE step #-}
step (x:xs) = return $ Yield x xs
step [] = return Stop
step _ (x:xs) = return $ Yield x xs
step _ [] = return Stop
-- XXX pass the state to streamD
{-# INLINE_LATE fromStreamK #-}
fromStreamK :: Monad m => K.Stream m a -> Stream m a
fromStreamK m = Stream step m
where
step m1 =
step gst m1 =
let stop = return Stop
single a = return $ Yield a K.nil
yieldk a r = return $ Yield a r
in K.unStream m1 Nothing stop single yieldk
in K.unStream m1 gst stop single yieldk
------------------------------------------------------------------------------
-- Elimination by Folds
@ -279,7 +281,7 @@ foldrM :: Monad m => (a -> b -> m b) -> b -> Stream m a -> m b
foldrM f z (Stream step state) = go SPEC state
where
go !_ st = do
r <- step st
r <- step defState st
case r of
Yield x s -> go SPEC s >>= f x
Stop -> return z
@ -293,7 +295,7 @@ foldlM' :: Monad m => (b -> a -> m b) -> b -> Stream m a -> m b
foldlM' fstep begin (Stream step state) = go SPEC begin state
where
go !_ acc st = acc `seq` do
r <- step st
r <- step defState st
case r of
Yield x s -> do
acc' <- fstep acc x
@ -314,7 +316,7 @@ runStream :: Monad m => Stream m a -> m ()
runStream (Stream step state) = go SPEC state
where
go !_ st = do
r <- step st
r <- step defState st
case r of
Yield _ s -> go SPEC s
Stop -> return ()
@ -324,7 +326,7 @@ null :: Monad m => Stream m a -> m Bool
null (Stream step state) = go state
where
go st = do
r <- step st
r <- step defState st
case r of
Yield _ _ -> return False
Stop -> return True
@ -335,7 +337,7 @@ head :: Monad m => Stream m a -> m (Maybe a)
head (Stream step state) = go state
where
go st = do
r <- step st
r <- step defState st
case r of
Yield x _ -> return (Just x)
Stop -> return Nothing
@ -346,7 +348,7 @@ tail :: Monad m => Stream m a -> m (Maybe (Stream m a))
tail (Stream step state) = go state
where
go st = do
r <- step st
r <- step defState st
case r of
Yield _ s -> return (Just $ Stream step s)
Stop -> return Nothing
@ -361,7 +363,7 @@ elem :: (Monad m, Eq a) => a -> Stream m a -> m Bool
elem e (Stream step state) = go state
where
go st = do
r <- step st
r <- step defState st
case r of
Yield x s ->
if x == e
@ -374,7 +376,7 @@ notElem :: (Monad m, Eq a) => a -> Stream m a -> m Bool
notElem e (Stream step state) = go state
where
go st = do
r <- step st
r <- step defState st
case r of
Yield x s ->
if x == e
@ -387,7 +389,7 @@ all :: Monad m => (a -> Bool) -> Stream m a -> m Bool
all p (Stream step state) = go state
where
go st = do
r <- step st
r <- step defState st
case r of
Yield x s ->
if p x
@ -400,7 +402,7 @@ any :: Monad m => (a -> Bool) -> Stream m a -> m Bool
any p (Stream step state) = go state
where
go st = do
r <- step st
r <- step defState st
case r of
Yield x s ->
if p x
@ -413,12 +415,12 @@ maximum :: (Monad m, Ord a) => Stream m a -> m (Maybe a)
maximum (Stream step state) = go Nothing state
where
go Nothing st = do
r <- step st
r <- step defState st
case r of
Yield x s -> go (Just x) s
Stop -> return Nothing
go (Just acc) st = do
r <- step st
r <- step defState st
case r of
Yield x s ->
if acc <= x
@ -431,12 +433,12 @@ minimum :: (Monad m, Ord a) => Stream m a -> m (Maybe a)
minimum (Stream step state) = go Nothing state
where
go Nothing st = do
r <- step st
r <- step defState st
case r of
Yield x s -> go (Just x) s
Stop -> return Nothing
go (Just acc) st = do
r <- step st
r <- step defState st
case r of
Yield x s ->
if acc <= x
@ -466,8 +468,8 @@ toList = foldr (:) []
toStreamK :: Monad m => Stream m a -> K.Stream m a
toStreamK (Stream step state) = go state
where
go st = K.Stream $ \_ stp _ yld -> do
r <- step st
go st = K.Stream $ \gst stp _ yld -> do
r <- step gst st
case r of
Yield x s -> yld x (go s)
Stop -> stp
@ -489,8 +491,8 @@ postscanlM' fstep begin (Stream step state) =
begin `seq` Stream step' (state, begin)
where
{-# INLINE_LATE step' #-}
step' (st, acc) = acc `seq` do
r <- step st
step' gst (st, acc) = acc `seq` do
r <- step (rstState gst) st
case r of
Yield x s -> do
y <- fstep acc x
@ -510,20 +512,20 @@ take :: Monad m => Int -> Stream m a -> Stream m a
take n (Stream step state) = n `seq` Stream step' (state, 0)
where
{-# INLINE_LATE step' #-}
step' (st, i) | i < n = do
r <- step st
step' gst (st, i) | i < n = do
r <- step (rstState gst) st
return $ case r of
Yield x s -> Yield x (s, i + 1)
Stop -> Stop
step' (_, _) = return Stop
step' _ (_, _) = return Stop
{-# INLINE_NORMAL takeWhileM #-}
takeWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
takeWhileM f (Stream step state) = Stream step' state
where
{-# INLINE_LATE step' #-}
step' st = do
r <- step st
step' gst st = do
r <- step (rstState gst) st
case r of
Yield x s -> do
b <- f x
@ -539,16 +541,16 @@ drop :: Monad m => Int -> Stream m a -> Stream m a
drop n (Stream step state) = Stream step' (state, Just n)
where
{-# INLINE_LATE step' #-}
step' (st, Just i)
step' gst (st, Just i)
| i > 0 = do
r <- step st
r <- step (rstState gst) st
case r of
Yield _ s -> step' (s, Just (i - 1))
Yield _ s -> step' (rstState gst) (s, Just (i - 1))
Stop -> return Stop
| otherwise = step' (st, Nothing)
| otherwise = step' gst (st, Nothing)
step' (st, Nothing) = do
r <- step st
step' gst (st, Nothing) = do
r <- step (rstState gst) st
return $ case r of
Yield x s -> Yield x (s, Nothing)
Stop -> Stop
@ -563,23 +565,23 @@ dropWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
dropWhileM f (Stream step state) = Stream step' (DropWhileDrop state)
where
{-# INLINE_LATE step' #-}
step' (DropWhileDrop st) = do
r <- step st
step' gst (DropWhileDrop st) = do
r <- step (rstState gst) st
case r of
Yield x s -> do
b <- f x
if b
then step' (DropWhileDrop s)
else step' (DropWhileYield x s)
then step' (rstState gst) (DropWhileDrop s)
else step' (rstState gst) (DropWhileYield x s)
Stop -> return Stop
step' (DropWhileNext st) = do
r <- step st
step' gst (DropWhileNext st) = do
r <- step (rstState gst) st
case r of
Yield x s -> step' (DropWhileYield x s)
Yield x s -> step' (rstState gst) (DropWhileYield x s)
Stop -> return Stop
step' (DropWhileYield x st) = return $ Yield x (DropWhileNext st)
step' _ (DropWhileYield x st) = return $ Yield x (DropWhileNext st)
{-# INLINE dropWhile #-}
dropWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
@ -590,14 +592,14 @@ filterM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
filterM f (Stream step state) = Stream step' state
where
{-# INLINE_LATE step' #-}
step' st = do
r <- step st
step' gst st = do
r <- step (rstState gst) st
case r of
Yield x s -> do
b <- f x
if b
then return $ Yield x s
else step' s
else step' (rstState gst) s
Stop -> return $ Stop
{-# INLINE filter #-}
@ -614,8 +616,8 @@ mapM :: Monad m => (a -> m b) -> Stream m a -> Stream m b
mapM f (Stream step state) = Stream step' state
where
{-# INLINE_LATE step' #-}
step' st = do
r <- step st
step' gst st = do
r <- step (rstState gst) st
case r of
Yield x s -> f x >>= \a -> return $ Yield a s
Stop -> return Stop
@ -647,14 +649,14 @@ zipWithM :: Monad m
zipWithM f (Stream stepa ta) (Stream stepb tb) = Stream step (ta, tb, Nothing)
where
{-# INLINE_LATE step #-}
step (sa, sb, Nothing) = do
r <- stepa sa
step gst (sa, sb, Nothing) = do
r <- stepa (rstState gst) sa
case r of
Yield x sa' -> step (sa', sb, Just x)
Yield x sa' -> step gst (sa', sb, Just x)
Stop -> return Stop
step (sa, sb, Just x) = do
r <- stepb sb
step gst (sa, sb, Just x) = do
r <- stepb (rstState gst) sb
case r of
Yield y sb' -> do
z <- f x y

View File

@ -166,7 +166,7 @@ import Streamly.SVar
newtype Stream m a =
Stream {
unStream :: forall r.
Maybe (SVar Stream m a) -- local state
State Stream m a -- state
-> m r -- stop
-> (a -> m r) -- singleton
-> (a -> Stream m a -> m r) -- yield
@ -247,15 +247,15 @@ adapt = fromStream . toStream
-- | Build a stream from an 'SVar', a stop continuation, a singleton stream
-- continuation and a yield continuation.
mkStream:: IsStream t
=> (forall r. Maybe (SVar Stream m a)
=> (forall r. State Stream m a
-> m r
-> (a -> m r)
-> (a -> t m a -> m r)
-> m r)
-> t m a
mkStream k = fromStream $ Stream $ \svr stp sng yld ->
mkStream k = fromStream $ Stream $ \st stp sng yld ->
let yieldk a r = yld a (toStream r)
in k svr stp sng yieldk
in k (rstState st) stp sng yieldk
------------------------------------------------------------------------------
-- Construction
@ -353,7 +353,7 @@ uncons m =
let stop = return Nothing
single a = return (Just (a, nil))
yieldk a r = return (Just (a, fromStream r))
in (unStream (toStream m)) Nothing stop single yieldk
in (unStream (toStream m)) defState stop single yieldk
-------------------------------------------------------------------------------
-- Generation
@ -450,15 +450,15 @@ fromStreamK = id
-- continuation and a yield continuation.
foldStream
:: IsStream t
=> Maybe (SVar Stream m a)
=> State Stream m a
-> m r
-> (a -> m r)
-> (a -> t m a -> m r)
-> t m a
-> m r
foldStream svr blank single step m =
foldStream st blank single step m =
let yieldk a x = step a (fromStream x)
in (unStream (toStream m)) svr blank single yieldk
in (unStream (toStream m)) st blank single yieldk
-- | Lazy right associative fold.
foldr :: (IsStream t, Monad m) => (a -> b -> b) -> b -> t m a -> m b
@ -468,7 +468,7 @@ foldr step acc m = go (toStream m)
let stop = return acc
single a = return (step a acc)
yieldk a r = go r >>= \b -> return (step a b)
in (unStream m1) Nothing stop single yieldk
in (unStream m1) defState stop single yieldk
-- | Lazy right fold with a monadic step function.
{-# INLINE foldrM #-}
@ -479,7 +479,7 @@ foldrM step acc m = go (toStream m)
let stop = return acc
single a = step a acc
yieldk a r = go r >>= step a
in (unStream m1) Nothing stop single yieldk
in (unStream m1) defState stop single yieldk
-- | Strict left fold with an extraction function. Like the standard strict
-- left fold, but applies a user supplied extraction function (the third
@ -493,7 +493,7 @@ foldx step begin done m = get $ go (toStream m) begin
{-# NOINLINE get #-}
get m1 =
let single = return . done
in (unStream m1) Nothing undefined single undefined
in (unStream m1) undefined undefined single undefined
-- Note, this can be implemented by making a recursive call to "go",
-- however that is more expensive because of unnecessary recursion
@ -504,8 +504,8 @@ foldx step begin done m = get $ go (toStream m) begin
single a = sng $ step acc a
yieldk a r =
let stream = go r (step acc a)
in (unStream stream) Nothing undefined sng yld
in (unStream m1) Nothing stop single yieldk
in (unStream stream) defState undefined sng yld
in (unStream m1) defState stop single yieldk
-- | Strict left associative fold.
{-# INLINE foldl' #-}
@ -522,7 +522,7 @@ foldxM step begin done m = go begin (toStream m)
let stop = acc >>= done
single a = acc >>= \b -> step b a >>= done
yieldk a r = acc >>= \b -> go (step b a) r
in (unStream m1) Nothing stop single yieldk
in (unStream m1) defState stop single yieldk
-- | Like 'foldl'' but with a monadic step function.
foldlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> b -> t m a -> m b
@ -540,7 +540,7 @@ runStream m = go (toStream m)
let stop = return ()
single _ = return ()
yieldk _ r = go (toStream r)
in unStream m1 Nothing stop single yieldk
in unStream m1 defState stop single yieldk
{-# INLINE null #-}
null :: (IsStream t, Monad m) => t m a -> m Bool
@ -548,7 +548,7 @@ null m =
let stop = return True
single _ = return False
yieldk _ _ = return False
in unStream (toStream m) Nothing stop single yieldk
in unStream (toStream m) defState stop single yieldk
{-# INLINE head #-}
head :: (IsStream t, Monad m) => t m a -> m (Maybe a)
@ -556,7 +556,7 @@ head m =
let stop = return Nothing
single a = return (Just a)
yieldk a _ = return (Just a)
in unStream (toStream m) Nothing stop single yieldk
in unStream (toStream m) defState stop single yieldk
{-# INLINE tail #-}
tail :: (IsStream t, Monad m) => t m a -> m (Maybe (t m a))
@ -564,7 +564,7 @@ tail m =
let stop = return Nothing
single _ = return $ Just nil
yieldk _ r = return $ Just $ fromStream r
in unStream (toStream m) Nothing stop single yieldk
in unStream (toStream m) defState stop single yieldk
{-# INLINE elem #-}
elem :: (IsStream t, Monad m, Eq a) => a -> t m a -> m Bool
@ -574,7 +574,7 @@ elem e m = go (toStream m)
let stop = return False
single a = return (a == e)
yieldk a r = if a == e then return True else go r
in (unStream m1) Nothing stop single yieldk
in (unStream m1) defState stop single yieldk
{-# INLINE notElem #-}
notElem :: (IsStream t, Monad m, Eq a) => a -> t m a -> m Bool
@ -584,7 +584,7 @@ notElem e m = go (toStream m)
let stop = return True
single a = return (a /= e)
yieldk a r = if a == e then return False else go r
in (unStream m1) Nothing stop single yieldk
in (unStream m1) defState stop single yieldk
all :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> m Bool
all p m = go (toStream m)
@ -594,7 +594,7 @@ all p m = go (toStream m)
| otherwise = return False
yieldk a r | p a = go r
| otherwise = return False
in unStream m1 Nothing (return True) single yieldk
in unStream m1 defState (return True) single yieldk
any :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> m Bool
any p m = go (toStream m)
@ -604,7 +604,7 @@ any p m = go (toStream m)
| otherwise = return False
yieldk a r | p a = return True
| otherwise = go r
in unStream m1 Nothing (return False) single yieldk
in unStream m1 defState (return False) single yieldk
-- | Extract the last element of the stream, if any.
{-# INLINE last #-}
@ -619,7 +619,7 @@ minimum m = go Nothing (toStream m)
let stop = return Nothing
single a = return (Just a)
yieldk a r = go (Just a) r
in unStream m1 Nothing stop single yieldk
in unStream m1 defState stop single yieldk
go (Just res) m1 =
let stop = return (Just res)
@ -631,7 +631,7 @@ minimum m = go Nothing (toStream m)
if res <= a
then go (Just res) r
else go (Just a) r
in unStream m1 Nothing stop single yieldk
in unStream m1 defState stop single yieldk
{-# INLINE maximum #-}
maximum :: (IsStream t, Monad m, Ord a) => t m a -> m (Maybe a)
@ -641,7 +641,7 @@ maximum m = go Nothing (toStream m)
let stop = return Nothing
single a = return (Just a)
yieldk a r = go (Just a) r
in unStream m1 Nothing stop single yieldk
in unStream m1 defState stop single yieldk
go (Just res) m1 =
let stop = return (Just res)
@ -653,7 +653,7 @@ maximum m = go Nothing (toStream m)
if res <= a
then go (Just a) r
else go (Just res) r
in unStream m1 Nothing stop single yieldk
in unStream m1 defState stop single yieldk
------------------------------------------------------------------------------
-- Map and Fold
@ -668,7 +668,7 @@ mapM_ f m = go (toStream m)
let stop = return ()
single a = void (f a)
yieldk a r = f a >> go r
in (unStream m1) Nothing stop single yieldk
in (unStream m1) defState stop single yieldk
------------------------------------------------------------------------------
-- Converting folds
@ -691,12 +691,12 @@ scanx :: IsStream t => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b
scanx step begin done m =
cons (done begin) $ fromStream $ go (toStream m) begin
where
go m1 !acc = Stream $ \_ stp sng yld ->
go m1 !acc = Stream $ \st stp sng yld ->
let single a = sng (done $ step acc a)
yieldk a r =
let s = step acc a
in yld (done s) (go r s)
in unStream m1 Nothing stp single yieldk
in unStream m1 (rstState st) stp single yieldk
{-# INLINE scanl' #-}
scanl' :: IsStream t => (b -> a -> b) -> b -> t m a -> t m b
@ -710,53 +710,56 @@ scanl' step begin m = scanx step begin id m
filter :: IsStream t => (a -> Bool) -> t m a -> t m a
filter p m = fromStream $ go (toStream m)
where
go m1 = Stream $ \_ stp sng yld ->
go m1 = Stream $ \st stp sng yld ->
let single a | p a = sng a
| otherwise = stp
yieldk a r | p a = yld a (go r)
| otherwise = (unStream r) Nothing stp single yieldk
in unStream m1 Nothing stp single yieldk
| otherwise = (unStream r) (rstState st) stp single yieldk
in unStream m1 (rstState st) stp single yieldk
{-# INLINE take #-}
take :: IsStream t => Int -> t m a -> t m a
take n m = fromStream $ go n (toStream m)
where
go n1 m1 = Stream $ \_ stp sng yld ->
go n1 m1 = Stream $ \st stp sng yld ->
let yieldk a r = yld a (go (n1 - 1) r)
in if n1 <= 0 then stp else unStream m1 Nothing stp sng yieldk
in if n1 <= 0
then stp
else unStream m1 (rstState st) stp sng yieldk
{-# INLINE takeWhile #-}
takeWhile :: IsStream t => (a -> Bool) -> t m a -> t m a
takeWhile p m = fromStream $ go (toStream m)
where
go m1 = Stream $ \_ stp sng yld ->
go m1 = Stream $ \st stp sng yld ->
let single a | p a = sng a
| otherwise = stp
yieldk a r | p a = yld a (go r)
| otherwise = stp
in unStream m1 Nothing stp single yieldk
in unStream m1 (rstState st) stp single yieldk
drop :: IsStream t => Int -> t m a -> t m a
drop n m = fromStream $ go n (toStream m)
drop n m = fromStream $ Stream $ \st stp sng yld ->
unStream (go n (toStream m)) (rstState st) stp sng yld
where
go n1 m1 = Stream $ \_ stp sng yld ->
go n1 m1 = Stream $ \st stp sng yld ->
let single _ = stp
yieldk _ r = (unStream $ go (n1 - 1) r) Nothing stp sng yld
yieldk _ r = (unStream $ go (n1 - 1) r) st stp sng yld
-- Somehow "<=" check performs better than a ">"
in if n1 <= 0
then unStream m1 Nothing stp sng yld
else unStream m1 Nothing stp single yieldk
then unStream m1 st stp sng yld
else unStream m1 st stp single yieldk
{-# INLINE dropWhile #-}
dropWhile :: IsStream t => (a -> Bool) -> t m a -> t m a
dropWhile p m = fromStream $ go (toStream m)
where
go m1 = Stream $ \_ stp sng yld ->
go m1 = Stream $ \st stp sng yld ->
let single a | p a = stp
| otherwise = sng a
yieldk a r | p a = (unStream r) Nothing stp single yieldk
yieldk a r | p a = (unStream r) (rstState st) stp single yieldk
| otherwise = yld a r
in unStream m1 Nothing stp single yieldk
in unStream m1 (rstState st) stp single yieldk
-------------------------------------------------------------------------------
-- Mapping
@ -764,10 +767,10 @@ dropWhile p m = fromStream $ go (toStream m)
{-# INLINE map #-}
map :: (IsStream t, Monad m) => (a -> b) -> t m a -> t m b
map f m = fromStream $ Stream $ \_ stp sng yld ->
map f m = fromStream $ Stream $ \st stp sng yld ->
let single = sng . f
yieldk a r = yld (f a) (fmap f r)
in unStream (toStream m) Nothing stp single yieldk
in unStream (toStream m) (rstState st) stp single yieldk
-- Be careful when modifying this, this uses a consM (|:) deliberately to allow
-- other stream types to overload it.
@ -775,10 +778,10 @@ map f m = fromStream $ Stream $ \_ stp sng yld ->
mapM :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b
mapM f m = go (toStream m)
where
go m1 = fromStream $ Stream $ \svr stp sng yld ->
go m1 = fromStream $ Stream $ \st stp sng yld ->
let single a = f a >>= sng
yieldk a r = unStream (toStream (f a |: (go r))) svr stp sng yld
in (unStream m1) Nothing stp single yieldk
yieldk a r = unStream (toStream (f a |: (go r))) st stp sng yld
in (unStream m1) (rstState st) stp single yieldk
-- Be careful when modifying this, this uses a consM (|:) deliberately to allow
-- other stream types to overload it.
@ -786,10 +789,10 @@ mapM f m = go (toStream m)
sequence :: (IsStream t, MonadAsync m) => t m (m a) -> t m a
sequence m = go (toStream m)
where
go m1 = fromStream $ Stream $ \svr stp sng yld ->
go m1 = fromStream $ Stream $ \st stp sng yld ->
let single ma = ma >>= sng
yieldk ma r = unStream (toStream $ ma |: go r) svr stp sng yld
in (unStream m1) Nothing stp single yieldk
yieldk ma r = unStream (toStream $ ma |: go r) st stp sng yld
in (unStream m1) (rstState st) stp single yieldk
-------------------------------------------------------------------------------
-- Map and Filter
@ -799,14 +802,14 @@ sequence m = go (toStream m)
mapMaybe :: IsStream t => (a -> Maybe b) -> t m a -> t m b
mapMaybe f m = go (toStream m)
where
go m1 = fromStream $ Stream $ \_ stp sng yld ->
go m1 = fromStream $ Stream $ \st stp sng yld ->
let single a = case f a of
Just b -> sng b
Nothing -> stp
yieldk a r = case f a of
Just b -> yld b (toStream $ go r)
Nothing -> (unStream r) Nothing stp single yieldk
in unStream m1 Nothing stp single yieldk
Nothing -> (unStream r) (rstState st) stp single yieldk
in unStream m1 (rstState st) stp single yieldk
------------------------------------------------------------------------------
-- Semigroup
@ -818,11 +821,11 @@ mapMaybe f m = go (toStream m)
serial :: Stream m a -> Stream m a -> Stream m a
serial m1 m2 = go m1
where
go (Stream m) = Stream $ \_ stp sng yld ->
let stop = (unStream m2) Nothing stp sng yld
go (Stream m) = Stream $ \st stp sng yld ->
let stop = (unStream m2) (rstState st) stp sng yld
single a = yld a m2
yieldk a r = yld a (go r)
in m Nothing stop single yieldk
in m (rstState st) stop single yieldk
instance Semigroup (Stream m a) where
(<>) = serial
@ -855,20 +858,20 @@ bindWith
bindWith par m f = go m
where
go (Stream g) =
Stream $ \ctx stp sng yld ->
let run x = (unStream x) ctx stp sng yld
Stream $ \st stp sng yld ->
let run x = (unStream x) st stp sng yld
single a = run $ f a
yieldk a r = run $ f a `par` go r
in g Nothing stp single yieldk
in g (rstState st) stp single yieldk
------------------------------------------------------------------------------
-- Alternative & MonadPlus
------------------------------------------------------------------------------
_alt :: Stream m a -> Stream m a -> Stream m a
_alt m1 m2 = Stream $ \_ stp sng yld ->
let stop = unStream m2 Nothing stp sng yld
in unStream m1 Nothing stop sng yld
_alt m1 m2 = Stream $ \st stp sng yld ->
let stop = unStream m2 (rstState st) stp sng yld
in unStream m1 (rstState st) stop sng yld
------------------------------------------------------------------------------
-- MonadReader
@ -876,10 +879,10 @@ _alt m1 m2 = Stream $ \_ stp sng yld ->
withLocal :: MonadReader r m => (r -> r) -> Stream m a -> Stream m a
withLocal f m =
Stream $ \_ stp sng yld ->
Stream $ \st stp sng yld ->
let single = local f . sng
yieldk a r = local f $ yld a (withLocal f r)
in (unStream m) Nothing (local f stp) single yieldk
in (unStream m) (rstState st) (local f stp) single yieldk
------------------------------------------------------------------------------
-- MonadError

View File

@ -42,9 +42,9 @@ import Data.Semigroup (Semigroup(..))
import Prelude hiding (map, repeat, zipWith)
import Streamly.Streams.StreamK (IsStream(..), Stream(..))
import Streamly.Streams.Async (mkAsync)
import Streamly.Streams.Async (mkAsync')
import Streamly.Streams.Serial (map)
import Streamly.SVar (MonadAsync)
import Streamly.SVar (MonadAsync, rstState)
import qualified Streamly.Streams.StreamK as K
@ -58,14 +58,14 @@ import qualified Streamly.Streams.StreamK as K
zipWithS :: (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipWithS f m1 m2 = go m1 m2
where
go mx my = Stream $ \_ stp sng yld -> do
go mx my = Stream $ \st stp sng yld -> do
let merge a ra =
let single2 b = sng (f a b)
yield2 b rb = yld (f a b) (go ra rb)
in unStream my Nothing stp single2 yield2
in unStream my (rstState st) stp single2 yield2
let single1 a = merge a K.nil
yield1 a ra = merge a ra
unStream mx Nothing stp single1 yield1
unStream mx (rstState st) stp single1 yield1
-- | Zip two streams serially using a pure zipping function.
--
@ -80,15 +80,15 @@ zipWith f m1 m2 = fromStream $ zipWithS f (toStream m1) (toStream m2)
zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c
zipWithM f m1 m2 = fromStream $ go (toStream m1) (toStream m2)
where
go mx my = Stream $ \_ stp sng yld -> do
go mx my = Stream $ \st stp sng yld -> do
let merge a ra =
let runIt x = unStream x Nothing stp sng yld
let runIt x = unStream x (rstState st) stp sng yld
single2 b = f a b >>= sng
yield2 b rb = f a b >>= \x -> runIt (x `K.cons` go ra rb)
in unStream my Nothing stp single2 yield2
in unStream my (rstState st) stp single2 yield2
let single1 a = merge a K.nil
yield1 a ra = merge a ra
unStream mx Nothing stp single1 yield1
unStream mx (rstState st) stp single1 yield1
------------------------------------------------------------------------------
-- Serially Zipping Streams
@ -169,10 +169,10 @@ instance Monad m => Applicative (ZipSerialM m) where
-- @since 0.1.0
zipAsyncWith :: (IsStream t, MonadAsync m)
=> (a -> b -> c) -> t m a -> t m b -> t m c
zipAsyncWith f m1 m2 = fromStream $ Stream $ \_ stp sng yld -> do
ma <- mkAsync m1
mb <- mkAsync m2
unStream (toStream (zipWith f ma mb)) Nothing stp sng yld
zipAsyncWith f m1 m2 = fromStream $ Stream $ \st stp sng yld -> do
ma <- mkAsync' (rstState st) m1
mb <- mkAsync' (rstState st) m2
unStream (toStream (zipWith f ma mb)) (rstState st) stp sng yld
-- | Zip two streams asyncly (i.e. both the elements being zipped are generated
-- concurrently) using a monadic zipping function.
@ -180,10 +180,10 @@ zipAsyncWith f m1 m2 = fromStream $ Stream $ \_ stp sng yld -> do
-- @since 0.4.0
zipAsyncWithM :: (IsStream t, MonadAsync m)
=> (a -> b -> m c) -> t m a -> t m b -> t m c
zipAsyncWithM f m1 m2 = fromStream $ Stream $ \_ stp sng yld -> do
ma <- mkAsync m1
mb <- mkAsync m2
unStream (toStream (zipWithM f ma mb)) Nothing stp sng yld
zipAsyncWithM f m1 m2 = fromStream $ Stream $ \st stp sng yld -> do
ma <- mkAsync' (rstState st) m1
mb <- mkAsync' (rstState st) m2
unStream (toStream (zipWithM f ma mb)) (rstState st) stp sng yld
------------------------------------------------------------------------------
-- Parallely Zipping Streams

View File

@ -49,12 +49,15 @@ equals eq stream list = do
constructWithReplicateM
:: IsStream t
=> (t IO Int -> SerialT IO Int)
-> Int
-> Int
-> Word8
-> Property
constructWithReplicateM op len =
constructWithReplicateM op thr buf len =
monadicIO $ do
let x = return (1 :: Int)
stream <- run $ (S.toList . op) (S.replicateM (fromIntegral len) x)
stream <- run $ (S.toList . op) (threads thr $ buffer buf $
S.replicateM (fromIntegral len) x)
list <- run $ replicateM (fromIntegral len) x
equals (==) stream list
@ -383,6 +386,13 @@ transformCombineOpsCommon constr desc t eq = do
prop (desc ++ " filter even") $
transform (filter even) t (S.filter even)
prop (desc ++ " filterM False") $
transform (filter (const False)) t (S.filterM (const $ return False))
prop (desc ++ " filterM True") $
transform (filter (const True)) t (S.filterM (const $ return True))
prop (desc ++ " filterM even") $
transform (filter even) t (S.filterM (return . even))
prop (desc ++ " take maxBound") $
transform (take maxBound) t (S.take maxBound)
prop (desc ++ " take 0") $ transform (take 0) t (S.take 0)
@ -392,6 +402,11 @@ transformCombineOpsCommon constr desc t eq = do
prop (desc ++ " takeWhile False") $
transform (takeWhile (const False)) t (S.takeWhile (const False))
prop (desc ++ " takeWhileM True") $
transform (takeWhile (const True)) t (S.takeWhileM (const $ return True))
prop (desc ++ " takeWhileM False") $
transform (takeWhile (const False)) t (S.takeWhileM (const $ return False))
prop (desc ++ " drop maxBound") $
transform (drop maxBound) t (S.drop maxBound)
prop (desc ++ " drop 0") $ transform (drop 0) t (S.drop 0)
@ -400,8 +415,19 @@ transformCombineOpsCommon constr desc t eq = do
transform (dropWhile (const True)) t (S.dropWhile (const True))
prop (desc ++ " dropWhile False") $
transform (dropWhile (const False)) t (S.dropWhile (const False))
prop (desc ++ " dropWhileM True") $
transform (dropWhile (const True)) t (S.dropWhileM (const $ return True))
prop (desc ++ " dropWhileM False") $
transform (dropWhile (const False)) t (S.dropWhileM (const $ return False))
prop (desc ++ " mapM (+1)") $
transform (map (+1)) t (S.mapM (\x -> return (x + 1)))
prop (desc ++ " scan") $ transform (scanl' (flip const) 0) t
(S.scanl' (flip const) 0)
prop (desc ++ " scanlM'") $ transform (scanl' (flip const) 0) t
(S.scanlM' (\_ a -> return a) 0)
prop (desc ++ " reverse") $ transform reverse t S.reverse
transformCombineOpsOrdered
@ -573,6 +599,23 @@ monadBind constr t eq (a, b) = withMaxSuccess maxTestCount $
let list = a >>= \x -> b >>= return . (+ x)
equals eq stream list
constructionConcurrent :: Int -> Int -> Spec
constructionConcurrent thr buf =
describe (" threads = " ++ show thr ++ "buffer = " ++ show buf) $ do
prop "asyncly replicateM" $ constructWithReplicateM asyncly thr buf
prop "wAsyncly replicateM" $ constructWithReplicateM wAsyncly thr buf
prop "parallely replicateM" $ constructWithReplicateM parallely thr buf
prop "aheadly replicateM" $ constructWithReplicateM aheadly thr buf
-- XXX test all concurrent ops for all these combinations
concurrentAll :: String -> (Int -> Int -> Spec) -> Spec
concurrentAll desc f = do
describe desc $ do
f 0 0 -- default
f 0 1 -- single buffer
f 1 0 -- single thread
f (-1) (-1) -- unbounded threads and buffer
main :: IO ()
main = hspec $ do
let folded :: IsStream t => [a] -> t IO a
@ -582,16 +625,16 @@ main = hspec $ do
_ -> foldMapWith (<>) return xs
)
describe "Construction" $ do
-- XXX test for all types of streams
prop "serially replicateM" $ constructWithReplicateM serially
prop "serially replicateM" $ constructWithReplicateM serially 0 0
it "iterate" $
(S.toList . serially . (S.take 100) $ (S.iterate (+ 1) (0 :: Int)))
`shouldReturn` (take 100 $ iterate (+ 1) 0)
-- XXX test for all types of streams
it "iterateM" $ do
let addM = (\ y -> return (y + 1))
S.toList . serially . (S.take 100) $ S.iterateM addM (0 :: Int)
`shouldReturn` (take 100 $ iterate (+ 1) 0)
concurrentAll "Construction" constructionConcurrent
describe "Functor operations" $ do
functorOps S.fromFoldable "serially" serially (==)
@ -778,6 +821,8 @@ main = hspec $ do
eliminationOps folded "wAsyncly folded" wAsyncly
eliminationOps folded "parallely folded" parallely
-- XXX Add a test where we chain all transformation APIs and make sure that
-- the state is being passed through all of them.
describe "Stream serial elimination operations" $ do
serialEliminationOps S.fromFoldable "serially" serially
serialEliminationOps S.fromFoldable "aheadly" aheadly