mirror of
https://github.com/ilyakooo0/streamly.git
synced 2024-10-26 09:59:48 +03:00
use folds and map from direct style stream
This commit is contained in:
parent
bd11ec2037
commit
3ae953064d
@ -10,6 +10,7 @@
|
||||
* Add `repeat` to generate an infinite stream by repeating a pure value
|
||||
* Add `fromList` and `fromListM` to generate streams from lists, faster than
|
||||
`fromFoldable` and `fromFoldableM`
|
||||
* Add `map` as a synonym of fmap
|
||||
* Significant performance improvements
|
||||
|
||||
## 0.3.0
|
||||
|
@ -15,6 +15,7 @@ import Gauge
|
||||
|
||||
-- We need a monadic bind here to make sure that the function f does not get
|
||||
-- completely optimized out by the compiler in some cases.
|
||||
{-# INLINE benchIO #-}
|
||||
benchIO :: (IsStream t, NFData b) => String -> (t IO Int -> IO b) -> Benchmark
|
||||
benchIO name f = bench name $ nfIO $ randomRIO (1,1000) >>= f . Ops.source
|
||||
|
||||
@ -50,13 +51,18 @@ main = do
|
||||
, benchSrcIO serially "foldMapWithM" Ops.sourceFoldMapWithM
|
||||
]
|
||||
, bgroup "elimination"
|
||||
[ benchIO "toList" Ops.toList
|
||||
, benchIO "fold" Ops.foldl
|
||||
[ benchIO "toNull" $ Ops.toNull serially
|
||||
, benchIO "mapM_" Ops.mapM_
|
||||
, benchIO "toList" Ops.toList
|
||||
, benchIO "foldr" Ops.foldr
|
||||
, benchIO "foldrM" Ops.foldrM
|
||||
, benchIO "foldl'" Ops.foldl
|
||||
, benchIO "last" Ops.last
|
||||
]
|
||||
, bgroup "transformation"
|
||||
[ benchIO "scan" Ops.scan
|
||||
, benchIO "map" Ops.map
|
||||
, benchIO "fmap" Ops.fmap
|
||||
, benchIO "mapM" (Ops.mapM serially)
|
||||
, benchIO "concat" Ops.concat
|
||||
]
|
||||
|
@ -26,6 +26,7 @@ maxValue = value + 1000
|
||||
|
||||
{-# INLINE scan #-}
|
||||
{-# INLINE map #-}
|
||||
{-# INLINE fmap #-}
|
||||
{-# INLINE filterEven #-}
|
||||
{-# INLINE filterAllOut #-}
|
||||
{-# INLINE filterAllIn #-}
|
||||
@ -39,7 +40,7 @@ maxValue = value + 1000
|
||||
{-# INLINE composeAllInFilters #-}
|
||||
{-# INLINE composeAllOutFilters #-}
|
||||
{-# INLINE composeMapAllInFilter #-}
|
||||
scan, map, filterEven, filterAllOut,
|
||||
scan, map, fmap, filterEven, filterAllOut,
|
||||
filterAllIn, takeOne, takeAll, takeWhileTrue, dropAll, dropWhileTrue, zip,
|
||||
concat, composeAllInFilters, composeAllOutFilters,
|
||||
composeMapAllInFilter
|
||||
@ -50,13 +51,17 @@ scan, map, filterEven, filterAllOut,
|
||||
composeMapM :: S.MonadAsync m => Stream m Int -> m ()
|
||||
|
||||
{-# INLINE toList #-}
|
||||
toList :: Monad m => Stream m Int -> m [Int]
|
||||
{-# INLINE foldr #-}
|
||||
{-# INLINE foldrM #-}
|
||||
toList, foldr, foldrM :: Monad m => Stream m Int -> m [Int]
|
||||
{-# INLINE foldl #-}
|
||||
foldl :: Monad m => Stream m Int -> m Int
|
||||
{-# INLINE last #-}
|
||||
last :: Monad m => Stream m Int -> m (Maybe Int)
|
||||
{-# INLINE toNull #-}
|
||||
toNull :: Monad m => (t m Int -> S.SerialT m Int) -> t m Int -> m ()
|
||||
{-# INLINE mapM_ #-}
|
||||
mapM_ :: Monad m => Stream m a -> m ()
|
||||
{-# INLINE mapM #-}
|
||||
mapM :: (S.IsStream t, S.MonadAsync m)
|
||||
=> (t m Int -> S.SerialT m Int) -> t m Int -> m ()
|
||||
@ -72,6 +77,7 @@ type Stream m a = S.SerialT m a
|
||||
{-# INLINE source #-}
|
||||
source :: (S.MonadAsync m, S.IsStream t) => Int -> t m Int
|
||||
source n = S.serially $ sourceUnfoldrM n
|
||||
-- source n = S.serially $ sourceFromList n
|
||||
|
||||
{-# INLINE sourceFromList #-}
|
||||
sourceFromList :: (Monad m, S.IsStream t) => Int -> t m Int
|
||||
@ -126,7 +132,10 @@ runStream = S.runStream
|
||||
-------------------------------------------------------------------------------
|
||||
|
||||
toNull t = runStream . t
|
||||
mapM_ = S.mapM_ (\_ -> return ())
|
||||
toList = S.toList
|
||||
foldr = S.foldr (:) []
|
||||
foldrM = S.foldrM (\a xs -> return (a : xs)) []
|
||||
foldl = S.foldl' (+) 0
|
||||
last = S.last
|
||||
|
||||
@ -139,7 +148,8 @@ transform :: Monad m => Stream m a -> m ()
|
||||
transform = runStream
|
||||
|
||||
scan = transform . S.scanl' (+) 0
|
||||
map = transform . fmap (+1)
|
||||
fmap = transform . Prelude.fmap (+1)
|
||||
map = transform . S.map (+1)
|
||||
mapM t = transform . t . S.mapM return
|
||||
filterEven = transform . S.filter even
|
||||
filterAllOut = transform . S.filter (> maxValue)
|
||||
@ -169,7 +179,8 @@ compose f = transform . f . f . f . f
|
||||
composeMapM = compose (S.mapM return)
|
||||
composeAllInFilters = compose (S.filter (<= maxValue))
|
||||
composeAllOutFilters = compose (S.filter (> maxValue))
|
||||
composeMapAllInFilter = compose (S.filter (<= maxValue) . fmap (subtract 1))
|
||||
composeMapAllInFilter =
|
||||
compose (S.filter (<= maxValue) . Prelude.fmap (subtract 1))
|
||||
|
||||
{-# INLINABLE composeScaling #-}
|
||||
composeScaling :: Monad m => Int -> Stream m Int -> m ()
|
||||
|
@ -47,11 +47,11 @@ module Streamly.Prelude
|
||||
(
|
||||
-- * Construction
|
||||
-- | Primitives to construct or inspect a stream.
|
||||
nil
|
||||
K.nil
|
||||
, K.yield
|
||||
, K.yieldM
|
||||
, cons
|
||||
, (.:)
|
||||
, K.cons
|
||||
, (K..:)
|
||||
, consM
|
||||
, (|:)
|
||||
|
||||
@ -116,6 +116,7 @@ module Streamly.Prelude
|
||||
, reverse
|
||||
|
||||
-- * Mapping
|
||||
, Serial.map
|
||||
, mapM
|
||||
, mapMaybe
|
||||
, mapMaybeM
|
||||
@ -140,25 +141,24 @@ module Streamly.Prelude
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Monad (void)
|
||||
import Control.Monad.IO.Class (MonadIO(..))
|
||||
import Data.Maybe (isJust, fromJust)
|
||||
import Prelude hiding (filter, drop, dropWhile, take,
|
||||
takeWhile, zipWith, foldr, foldl,
|
||||
mapM, mapM_, sequence, all, any,
|
||||
sum, product, elem, notElem,
|
||||
maximum, minimum, head, last,
|
||||
tail, length, null, reverse,
|
||||
iterate)
|
||||
import Control.Monad.IO.Class (MonadIO(..))
|
||||
import Data.Maybe (isJust, fromJust)
|
||||
import Prelude
|
||||
hiding (filter, drop, dropWhile, take, takeWhile, zipWith, foldr,
|
||||
foldl, map, mapM, mapM_, sequence, all, any, sum, product, elem,
|
||||
notElem, maximum, minimum, head, last, tail, length, null,
|
||||
reverse, iterate)
|
||||
import qualified Prelude
|
||||
import qualified System.IO as IO
|
||||
|
||||
import Streamly.Streams.StreamK (IsStream(..), cons, nil, (.:))
|
||||
import Streamly.SVar (MonadAsync)
|
||||
import Streamly.Streams.StreamK (IsStream(..))
|
||||
import Streamly.Streams.Serial (SerialT)
|
||||
import Streamly.Streams.Zip (zipWith, zipWithM, zipAsyncWith, zipAsyncWithM)
|
||||
|
||||
import qualified Streamly.Streams.StreamK as K
|
||||
import qualified Streamly.Streams.StreamD as D
|
||||
import Streamly.Streams.Zip
|
||||
import Streamly.Streams.Serial
|
||||
import Streamly.SVar (MonadAsync)
|
||||
import qualified Streamly.Streams.Serial as Serial
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
-- Construction
|
||||
@ -260,7 +260,7 @@ fromListM = fromStream . D.toStreamK . D.fromListM
|
||||
-- @since 0.3.0
|
||||
{-# INLINE fromFoldableM #-}
|
||||
fromFoldableM :: (IsStream t, MonadAsync m, Foldable f) => f (m a) -> t m a
|
||||
fromFoldableM = Prelude.foldr consM nil
|
||||
fromFoldableM = Prelude.foldr consM K.nil
|
||||
|
||||
-- | Same as 'fromFoldable'.
|
||||
--
|
||||
@ -284,7 +284,7 @@ each = K.fromFoldable
|
||||
replicateM :: (IsStream t, MonadAsync m) => Int -> m a -> t m a
|
||||
replicateM n m = go n
|
||||
where
|
||||
go cnt = if cnt <= 0 then nil else m |: go (cnt - 1)
|
||||
go cnt = if cnt <= 0 then K.nil else m |: go (cnt - 1)
|
||||
|
||||
-- | Generate a stream by repeatedly executing a monadic action forever.
|
||||
--
|
||||
@ -360,14 +360,11 @@ fromHandle h = fromStream go
|
||||
-- @
|
||||
--
|
||||
-- @since 0.1.0
|
||||
{-# INLINE foldr #-}
|
||||
foldr :: Monad m => (a -> b -> b) -> b -> SerialT m a -> m b
|
||||
foldr step acc m = go (toStream m)
|
||||
where
|
||||
go m1 =
|
||||
let stop = return acc
|
||||
single a = return (step a acc)
|
||||
yieldk a r = go r >>= \b -> return (step a b)
|
||||
in (K.unStream m1) Nothing stop single yieldk
|
||||
-- XXX somehow this definition does not perform well, need to investigate
|
||||
-- foldr step acc m = D.foldr step acc $ D.fromStreamK (toStream m)
|
||||
foldr f = foldrM (\a b -> return (f a b))
|
||||
|
||||
-- | Lazy right fold with a monadic step function. For example, to fold a
|
||||
-- stream into a list:
|
||||
@ -380,13 +377,7 @@ foldr step acc m = go (toStream m)
|
||||
-- @since 0.2.0
|
||||
{-# INLINE foldrM #-}
|
||||
foldrM :: Monad m => (a -> b -> m b) -> b -> SerialT m a -> m b
|
||||
foldrM step acc m = go (toStream m)
|
||||
where
|
||||
go m1 =
|
||||
let stop = return acc
|
||||
single a = step a acc
|
||||
yieldk a r = go r >>= step a
|
||||
in (K.unStream m1) Nothing stop single yieldk
|
||||
foldrM step acc m = D.foldrM step acc $ D.fromStreamK (toStream m)
|
||||
|
||||
-- | Strict left scan with an extraction function. Like 'scanl'', but applies a
|
||||
-- user supplied extraction function (the third argument) at each step. This is
|
||||
@ -397,7 +388,7 @@ foldrM step acc m = go (toStream m)
|
||||
{-# INLINE scanx #-}
|
||||
scanx :: IsStream t => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b
|
||||
scanx step begin done m =
|
||||
cons (done begin) $ fromStream $ go (toStream m) begin
|
||||
K.cons (done begin) $ fromStream $ go (toStream m) begin
|
||||
where
|
||||
go m1 !acc = K.Stream $ \_ stp sng yld ->
|
||||
let single a = sng (done $ step acc a)
|
||||
@ -430,24 +421,7 @@ scanl' step begin m = scanx step begin id m
|
||||
-- @since 0.2.0
|
||||
{-# INLINE foldx #-}
|
||||
foldx :: Monad m => (x -> a -> x) -> x -> (x -> b) -> SerialT m a -> m b
|
||||
foldx step begin done m = get $ go (toStream m) begin
|
||||
where
|
||||
{-# NOINLINE get #-}
|
||||
get m1 =
|
||||
let single = return . done
|
||||
in (K.unStream m1) Nothing undefined single undefined
|
||||
|
||||
-- Note, this can be implemented by making a recursive call to "go",
|
||||
-- however that is more expensive because of unnecessary recursion
|
||||
-- that cannot be tail call optimized. Unfolding recursion explicitly via
|
||||
-- continuations is much more efficient.
|
||||
go m1 !acc = K.Stream $ \_ _ sng yld ->
|
||||
let stop = sng acc
|
||||
single a = sng $ step acc a
|
||||
yieldk a r =
|
||||
let stream = go r (step acc a)
|
||||
in (K.unStream stream) Nothing undefined sng yld
|
||||
in (K.unStream m1) Nothing stop single yieldk
|
||||
foldx = K.foldx
|
||||
|
||||
-- |
|
||||
-- @since 0.1.0
|
||||
@ -460,20 +434,14 @@ foldl = foldx
|
||||
-- @since 0.2.0
|
||||
{-# INLINE foldl' #-}
|
||||
foldl' :: Monad m => (b -> a -> b) -> b -> SerialT m a -> m b
|
||||
foldl' step begin m = foldx step begin id m
|
||||
foldl' step begin m = D.foldl' step begin $ D.fromStreamK (toStream m)
|
||||
|
||||
-- XXX replace the recursive "go" with explicit continuations.
|
||||
-- | Like 'foldx', but with a monadic step function.
|
||||
--
|
||||
-- @since 0.2.0
|
||||
foldxM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> SerialT m a -> m b
|
||||
foldxM step begin done m = go begin (toStream m)
|
||||
where
|
||||
go !acc m1 =
|
||||
let stop = acc >>= done
|
||||
single a = acc >>= \b -> step b a >>= done
|
||||
yieldk a r = acc >>= \b -> go (step b a) r
|
||||
in (K.unStream m1) Nothing stop single yieldk
|
||||
foldxM = K.foldxM
|
||||
|
||||
-- |
|
||||
-- @since 0.1.0
|
||||
@ -485,7 +453,7 @@ foldlM = foldxM
|
||||
--
|
||||
-- @since 0.2.0
|
||||
foldlM' :: Monad m => (b -> a -> m b) -> b -> SerialT m a -> m b
|
||||
foldlM' step begin m = foldxM step (return begin) return m
|
||||
foldlM' step begin m = D.foldlM' step begin $ D.fromStreamK (toStream m)
|
||||
|
||||
-- | Decompose a stream into its head and tail. If the stream is empty, returns
|
||||
-- 'Nothing'. If the stream is non-empty, returns @Just (a, ma)@, where @a@ is
|
||||
@ -495,7 +463,7 @@ foldlM' step begin m = foldxM step (return begin) return m
|
||||
uncons :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (a, t m a))
|
||||
uncons m =
|
||||
let stop = return Nothing
|
||||
single a = return (Just (a, nil))
|
||||
single a = return (Just (a, K.nil))
|
||||
yieldk a r = return (Just (a, fromStream r))
|
||||
in (K.unStream (toStream m)) Nothing stop single yieldk
|
||||
|
||||
@ -518,9 +486,9 @@ toHandle h m = go (toStream m)
|
||||
-- | Convert a stream into a list in the underlying monad.
|
||||
--
|
||||
-- @since 0.1.0
|
||||
{-# INLINABLE toList #-}
|
||||
{-# INLINE toList #-}
|
||||
toList :: Monad m => SerialT m a -> m [a]
|
||||
toList = foldrM (\a xs -> return (a : xs)) []
|
||||
toList m = D.toList $ D.fromStreamK (toStream m)
|
||||
|
||||
-- | Take first 'n' elements from the stream and discard the rest.
|
||||
--
|
||||
@ -644,7 +612,7 @@ head m =
|
||||
tail :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a))
|
||||
tail m =
|
||||
let stop = return Nothing
|
||||
single _ = return $ Just nil
|
||||
single _ = return $ Just K.nil
|
||||
yieldk _ r = return $ Just $ fromStream r
|
||||
in (K.unStream (toStream m)) Nothing stop single yieldk
|
||||
|
||||
@ -653,7 +621,7 @@ tail m =
|
||||
-- @since 0.1.1
|
||||
{-# INLINE last #-}
|
||||
last :: Monad m => SerialT m a -> m (Maybe a)
|
||||
last = foldl (\_ y -> Just y) Nothing id
|
||||
last m = D.last $ D.fromStreamK (toStream m)
|
||||
|
||||
-- | Determine whether the stream is empty.
|
||||
--
|
||||
@ -761,14 +729,14 @@ maximum m = go Nothing (toStream m)
|
||||
-- /Concurrent (do not use with 'parallely' on infinite streams)/
|
||||
--
|
||||
-- @since 0.1.0
|
||||
{-# INLINE mapM #-}
|
||||
{-# INLINE_EARLY mapM #-}
|
||||
mapM :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b
|
||||
mapM f m = go (toStream m)
|
||||
where
|
||||
go m1 = fromStream $ K.Stream $ \svr stp sng yld ->
|
||||
let single a = f a >>= sng
|
||||
yieldk a r = K.unStream (toStream (f a |: (go r))) svr stp sng yld
|
||||
in (K.unStream m1) Nothing stp single yieldk
|
||||
mapM = K.mapM
|
||||
|
||||
{-# RULES "mapM serial" mapM = mapMSerial #-}
|
||||
{-# INLINE mapMSerial #-}
|
||||
mapMSerial :: Monad m => (a -> m b) -> SerialT m a -> SerialT m b
|
||||
mapMSerial = Serial.mapM
|
||||
|
||||
-- | Map a 'Maybe' returning function to a stream, filter out the 'Nothing'
|
||||
-- elements, and return a stream of values extracted from 'Just'.
|
||||
@ -802,14 +770,9 @@ mapMaybeM f = fmap fromJust . filter isJust . mapM f
|
||||
-- output of the action.
|
||||
--
|
||||
-- @since 0.1.0
|
||||
{-# INLINE mapM_ #-}
|
||||
mapM_ :: Monad m => (a -> m b) -> SerialT m a -> m ()
|
||||
mapM_ f m = go (toStream m)
|
||||
where
|
||||
go m1 =
|
||||
let stop = return ()
|
||||
single a = void (f a)
|
||||
yieldk a r = f a >> go r
|
||||
in (K.unStream m1) Nothing stop single yieldk
|
||||
mapM_ f m = D.mapM_ f $ D.fromStreamK (toStream m)
|
||||
|
||||
-- | Reduce a stream of monadic actions to a stream of the output of those
|
||||
-- actions.
|
||||
|
@ -44,13 +44,16 @@ import Data.Semigroup (Semigroup(..))
|
||||
import qualified Data.Heap as H
|
||||
|
||||
import Streamly.Streams.SVar (fromSVar)
|
||||
import Streamly.Streams.StreamK
|
||||
import Streamly.Streams.Serial (map)
|
||||
import Streamly.SVar
|
||||
import Streamly.Streams.StreamK (IsStream(..), Stream(..))
|
||||
import qualified Streamly.Streams.StreamK as K
|
||||
|
||||
#ifdef DIAGNOSTICS
|
||||
import Control.Monad (when)
|
||||
import Data.IORef (writeIORef, readIORef)
|
||||
#endif
|
||||
import Prelude hiding (map)
|
||||
|
||||
#include "Instances.hs"
|
||||
|
||||
@ -128,7 +131,7 @@ workLoopAhead sv q heap = runHeap
|
||||
else liftIO $ sendStop sv
|
||||
|
||||
singleToHeap seqNo a = toHeap seqNo (AheadEntryPure a)
|
||||
yieldToHeap seqNo a r = toHeap seqNo (AheadEntryStream (a `cons` r))
|
||||
yieldToHeap seqNo a r = toHeap seqNo (AheadEntryStream (a `K.cons` r))
|
||||
|
||||
singleOutput seqNo a = do
|
||||
continue <- liftIO $ send sv (ChildYield a)
|
||||
@ -242,7 +245,7 @@ aheadS m1 m2 = Stream $ \svr stp sng yld -> do
|
||||
-- of combining streams using ahead.
|
||||
{-# INLINE consMAhead #-}
|
||||
consMAhead :: MonadAsync m => m a -> Stream m a -> Stream m a
|
||||
consMAhead m r = yieldM m `aheadS` r
|
||||
consMAhead m r = K.yieldM m `aheadS` r
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
-- AheadT
|
||||
@ -293,7 +296,7 @@ consMAhead m r = yieldM m `aheadS` r
|
||||
--
|
||||
-- @since 0.3.0
|
||||
newtype AheadT m a = AheadT {getAheadT :: Stream m a}
|
||||
deriving (Functor, MonadTrans)
|
||||
deriving (MonadTrans)
|
||||
|
||||
-- | A serial IO stream of elements of type @a@ with concurrent lookahead. See
|
||||
-- 'AheadT' documentation for more details.
|
||||
@ -305,7 +308,7 @@ type Ahead a = AheadT IO a
|
||||
--
|
||||
-- @since 0.3.0
|
||||
aheadly :: IsStream t => AheadT m a -> t m a
|
||||
aheadly = adapt
|
||||
aheadly = K.adapt
|
||||
|
||||
instance IsStream AheadT where
|
||||
toStream = getAheadT
|
||||
@ -339,7 +342,7 @@ instance MonadAsync m => Semigroup (AheadT m a) where
|
||||
------------------------------------------------------------------------------
|
||||
|
||||
instance MonadAsync m => Monoid (AheadT m a) where
|
||||
mempty = nil
|
||||
mempty = K.nil
|
||||
mappend = (<>)
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
|
@ -31,7 +31,6 @@ module Streamly.Streams.Async
|
||||
, WAsync
|
||||
, wAsyncly
|
||||
, wAsync
|
||||
, runAsyncT -- deprecated
|
||||
)
|
||||
where
|
||||
|
||||
@ -49,11 +48,14 @@ import Data.IORef (IORef, newIORef, readIORef)
|
||||
import Data.Maybe (fromJust)
|
||||
import Data.Semigroup (Semigroup(..))
|
||||
|
||||
import Prelude hiding (map)
|
||||
import qualified Data.Set as S
|
||||
|
||||
import Streamly.Streams.SVar (fromSVar)
|
||||
import Streamly.Streams.StreamK
|
||||
import Streamly.Streams.Serial (map)
|
||||
import Streamly.SVar
|
||||
import Streamly.Streams.StreamK (IsStream(..), Stream(..), adapt)
|
||||
import qualified Streamly.Streams.StreamK as K
|
||||
|
||||
#include "Instances.hs"
|
||||
|
||||
@ -323,7 +325,7 @@ async m1 m2 = fromStream $
|
||||
-- of combining streams using async.
|
||||
{-# INLINE consMAsync #-}
|
||||
consMAsync :: MonadAsync m => m a -> Stream m a -> Stream m a
|
||||
consMAsync m r = yieldM m `asyncS` r
|
||||
consMAsync m r = K.yieldM m `asyncS` r
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
-- AsyncT
|
||||
@ -379,7 +381,7 @@ consMAsync m r = yieldM m `asyncS` r
|
||||
--
|
||||
-- @since 0.1.0
|
||||
newtype AsyncT m a = AsyncT {getAsyncT :: Stream m a}
|
||||
deriving (Functor, MonadTrans)
|
||||
deriving (MonadTrans)
|
||||
|
||||
-- | A demand driven left biased parallely composing IO stream of elements of
|
||||
-- type @a@. See 'AsyncT' documentation for more details.
|
||||
@ -417,7 +419,7 @@ instance MonadAsync m => Semigroup (AsyncT m a) where
|
||||
------------------------------------------------------------------------------
|
||||
|
||||
instance MonadAsync m => Monoid (AsyncT m a) where
|
||||
mempty = nil
|
||||
mempty = K.nil
|
||||
mappend = (<>)
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
@ -426,7 +428,7 @@ instance MonadAsync m => Monoid (AsyncT m a) where
|
||||
|
||||
instance MonadAsync m => Monad (AsyncT m) where
|
||||
return = pure
|
||||
(AsyncT m) >>= f = AsyncT $ bindWith asyncS m (getAsyncT . f)
|
||||
(AsyncT m) >>= f = AsyncT $ K.bindWith asyncS m (getAsyncT . f)
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
-- Other instances
|
||||
@ -447,7 +449,7 @@ wAsyncS = joinStreamVarAsync WAsyncVar
|
||||
-- of combining streams using wAsync.
|
||||
{-# INLINE consMWAsync #-}
|
||||
consMWAsync :: MonadAsync m => m a -> Stream m a -> Stream m a
|
||||
consMWAsync m r = yieldM m `wAsyncS` r
|
||||
consMWAsync m r = K.yieldM m `wAsyncS` r
|
||||
|
||||
-- | Polymorphic version of the 'Semigroup' operation '<>' of 'WAsyncT'.
|
||||
-- Merges two streams concurrently choosing elements from both fairly.
|
||||
@ -505,7 +507,7 @@ wAsync m1 m2 = fromStream $ wAsyncS (toStream m1) (toStream m2)
|
||||
--
|
||||
-- @since 0.2.0
|
||||
newtype WAsyncT m a = WAsyncT {getWAsyncT :: Stream m a}
|
||||
deriving (Functor, MonadTrans)
|
||||
deriving (MonadTrans)
|
||||
|
||||
-- | A round robin parallely composing IO stream of elements of type @a@.
|
||||
-- See 'WAsyncT' documentation for more details.
|
||||
@ -543,7 +545,7 @@ instance MonadAsync m => Semigroup (WAsyncT m a) where
|
||||
------------------------------------------------------------------------------
|
||||
|
||||
instance MonadAsync m => Monoid (WAsyncT m a) where
|
||||
mempty = nil
|
||||
mempty = K.nil
|
||||
mappend = (<>)
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
@ -553,7 +555,7 @@ instance MonadAsync m => Monoid (WAsyncT m a) where
|
||||
instance MonadAsync m => Monad (WAsyncT m) where
|
||||
return = pure
|
||||
(WAsyncT m) >>= f =
|
||||
WAsyncT $ bindWith wAsyncS m (getWAsyncT . f)
|
||||
WAsyncT $ K.bindWith wAsyncS m (getWAsyncT . f)
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
-- Other instances
|
||||
@ -561,10 +563,3 @@ instance MonadAsync m => Monad (WAsyncT m) where
|
||||
|
||||
MONAD_APPLICATIVE_INSTANCE(WAsyncT,MONADPARALLEL)
|
||||
MONAD_COMMON_INSTANCES(WAsyncT, MONADPARALLEL)
|
||||
|
||||
-- | Same as @runStream . asyncly@.
|
||||
--
|
||||
-- @since 0.1.0
|
||||
{-# DEPRECATED runAsyncT "Please use 'runStream . asyncly' instead." #-}
|
||||
runAsyncT :: Monad m => AsyncT m a -> m ()
|
||||
runAsyncT = runStream
|
||||
|
@ -9,10 +9,13 @@
|
||||
|
||||
#define MONAD_APPLICATIVE_INSTANCE(STREAM,CONSTRAINT) \
|
||||
instance (Monad m CONSTRAINT) => Applicative (STREAM m) where { \
|
||||
pure = STREAM . yield; \
|
||||
pure = STREAM . K.yield; \
|
||||
(<*>) = ap }
|
||||
|
||||
#define MONAD_COMMON_INSTANCES(STREAM,CONSTRAINT) \
|
||||
instance Monad m => Functor (STREAM m) where { \
|
||||
fmap = map }; \
|
||||
\
|
||||
instance (MonadBase b m, Monad m CONSTRAINT) => MonadBase b (STREAM m) where {\
|
||||
liftBase = liftBaseDefault }; \
|
||||
\
|
||||
@ -31,7 +34,7 @@ instance (MonadError e m CONSTRAINT) => MonadError e (STREAM m) where { \
|
||||
\
|
||||
instance (MonadReader r m CONSTRAINT) => MonadReader r (STREAM m) where { \
|
||||
ask = lift ask; \
|
||||
local f m = fromStream $ withLocal f (toStream m) }; \
|
||||
local f m = fromStream $ K.withLocal f (toStream m) }; \
|
||||
\
|
||||
instance (MonadState s m CONSTRAINT) => MonadState s (STREAM m) where { \
|
||||
get = lift get; \
|
||||
|
@ -24,7 +24,6 @@ module Streamly.Streams.Parallel
|
||||
, Parallel
|
||||
, parallely
|
||||
, parallel
|
||||
, runParallelT -- deprecated
|
||||
|
||||
-- * Function application
|
||||
, mkParallel
|
||||
@ -45,10 +44,13 @@ import Control.Monad.State.Class (MonadState(..))
|
||||
import Control.Monad.Trans.Class (MonadTrans(lift))
|
||||
import Data.Functor (void)
|
||||
import Data.Semigroup (Semigroup(..))
|
||||
import Prelude hiding (map)
|
||||
|
||||
import Streamly.Streams.SVar (fromSVar)
|
||||
import Streamly.Streams.StreamK
|
||||
import Streamly.Streams.Serial (map)
|
||||
import Streamly.SVar
|
||||
import Streamly.Streams.StreamK (IsStream(..), Stream(..), adapt)
|
||||
import qualified Streamly.Streams.StreamK as K
|
||||
|
||||
#include "Instances.hs"
|
||||
|
||||
@ -96,7 +98,7 @@ parallelStream = joinStreamVarPar ParallelVar
|
||||
-- of combining streams using parallel.
|
||||
{-# INLINE consMParallel #-}
|
||||
consMParallel :: MonadAsync m => m a -> Stream m a -> Stream m a
|
||||
consMParallel m r = yieldM m `parallelStream` r
|
||||
consMParallel m r = K.yieldM m `parallelStream` r
|
||||
|
||||
-- | Polymorphic version of the 'Semigroup' operation '<>' of 'ParallelT'
|
||||
-- Merges two streams concurrently.
|
||||
@ -307,7 +309,7 @@ x |&. f = f |$. x
|
||||
--
|
||||
-- @since 0.1.0
|
||||
newtype ParallelT m a = ParallelT {getParallelT :: Stream m a}
|
||||
deriving (Functor, MonadTrans)
|
||||
deriving (MonadTrans)
|
||||
|
||||
-- | A parallely composing IO stream of elements of type @a@.
|
||||
-- See 'ParallelT' documentation for more details.
|
||||
@ -345,7 +347,7 @@ instance MonadAsync m => Semigroup (ParallelT m a) where
|
||||
------------------------------------------------------------------------------
|
||||
|
||||
instance MonadAsync m => Monoid (ParallelT m a) where
|
||||
mempty = nil
|
||||
mempty = K.nil
|
||||
mappend = (<>)
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
@ -355,7 +357,7 @@ instance MonadAsync m => Monoid (ParallelT m a) where
|
||||
instance MonadAsync m => Monad (ParallelT m) where
|
||||
return = pure
|
||||
(ParallelT m) >>= f
|
||||
= ParallelT $ bindWith parallelStream m (getParallelT . f)
|
||||
= ParallelT $ K.bindWith parallelStream m (getParallelT . f)
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
-- Other instances
|
||||
@ -363,10 +365,3 @@ instance MonadAsync m => Monad (ParallelT m) where
|
||||
|
||||
MONAD_APPLICATIVE_INSTANCE(ParallelT,MONADPARALLEL)
|
||||
MONAD_COMMON_INSTANCES(ParallelT, MONADPARALLEL)
|
||||
|
||||
-- | Same as @runStream . parallely@.
|
||||
--
|
||||
-- @since 0.1.0
|
||||
{-# DEPRECATED runParallelT "Please use 'runStream . parallely' instead." #-}
|
||||
runParallelT :: Monad m => ParallelT m a -> m ()
|
||||
runParallelT = runStream
|
||||
|
@ -29,7 +29,13 @@ module Streamly.Streams.Prelude
|
||||
(
|
||||
-- * Elimination
|
||||
runStream
|
||||
, runStreaming -- deprecated
|
||||
, runStreaming -- deprecated
|
||||
, runStreamT -- deprecated
|
||||
, runInterleavedT -- deprecated
|
||||
, runParallelT -- deprecated
|
||||
, runAsyncT -- deprecated
|
||||
, runZipStream -- deprecated
|
||||
, runZipAsync -- deprecated
|
||||
|
||||
-- * Fold Utilities
|
||||
, foldWith
|
||||
@ -38,8 +44,12 @@ module Streamly.Streams.Prelude
|
||||
)
|
||||
where
|
||||
|
||||
import Streamly.Streams.Serial (SerialT)
|
||||
import Streamly.Streams.StreamK hiding (runStream)
|
||||
import Streamly.Streams.StreamK (IsStream(..))
|
||||
import Streamly.Streams.Serial (SerialT, WSerialT)
|
||||
import Streamly.Streams.Parallel (ParallelT)
|
||||
import Streamly.Streams.Async (AsyncT)
|
||||
import Streamly.Streams.Zip (ZipSerialM, ZipAsyncM)
|
||||
|
||||
import qualified Streamly.Streams.StreamD as D
|
||||
import qualified Streamly.Streams.StreamK as K
|
||||
|
||||
@ -63,7 +73,49 @@ runStream m = D.runStream $ D.fromStreamK (toStream m)
|
||||
-- @since 0.1.0
|
||||
{-# DEPRECATED runStreaming "Please use runStream instead." #-}
|
||||
runStreaming :: (Monad m, IsStream t) => t m a -> m ()
|
||||
runStreaming = runStream . adapt
|
||||
runStreaming = runStream . K.adapt
|
||||
|
||||
-- | Same as @runStream@.
|
||||
--
|
||||
-- @since 0.1.0
|
||||
{-# DEPRECATED runStreamT "Please use runStream instead." #-}
|
||||
runStreamT :: Monad m => SerialT m a -> m ()
|
||||
runStreamT = runStream
|
||||
|
||||
-- | Same as @runStream . wSerially@.
|
||||
--
|
||||
-- @since 0.1.0
|
||||
{-# DEPRECATED runInterleavedT "Please use 'runStream . interleaving' instead." #-}
|
||||
runInterleavedT :: Monad m => WSerialT m a -> m ()
|
||||
runInterleavedT = runStream . K.adapt
|
||||
|
||||
-- | Same as @runStream . parallely@.
|
||||
--
|
||||
-- @since 0.1.0
|
||||
{-# DEPRECATED runParallelT "Please use 'runStream . parallely' instead." #-}
|
||||
runParallelT :: Monad m => ParallelT m a -> m ()
|
||||
runParallelT = runStream . K.adapt
|
||||
|
||||
-- | Same as @runStream . asyncly@.
|
||||
--
|
||||
-- @since 0.1.0
|
||||
{-# DEPRECATED runAsyncT "Please use 'runStream . asyncly' instead." #-}
|
||||
runAsyncT :: Monad m => AsyncT m a -> m ()
|
||||
runAsyncT = runStream . K.adapt
|
||||
|
||||
-- | Same as @runStream . zipping@.
|
||||
--
|
||||
-- @since 0.1.0
|
||||
{-# DEPRECATED runZipStream "Please use 'runStream . zipSerially instead." #-}
|
||||
runZipStream :: Monad m => ZipSerialM m a -> m ()
|
||||
runZipStream = runStream . K.adapt
|
||||
|
||||
-- | Same as @runStream . zippingAsync@.
|
||||
--
|
||||
-- @since 0.1.0
|
||||
{-# DEPRECATED runZipAsync "Please use 'runStream . zipAsyncly instead." #-}
|
||||
runZipAsync :: Monad m => ZipAsyncM m a -> m ()
|
||||
runZipAsync = runStream . K.adapt
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
-- Fold Utilities
|
||||
@ -78,7 +130,7 @@ runStreaming = runStream . adapt
|
||||
{-# INLINABLE foldWith #-}
|
||||
foldWith :: (IsStream t, Foldable f)
|
||||
=> (t m a -> t m a -> t m a) -> f (t m a) -> t m a
|
||||
foldWith f = foldr f nil
|
||||
foldWith f = foldr f K.nil
|
||||
|
||||
-- | A variant of 'foldMap' that allows you to map a monadic streaming action
|
||||
-- on a 'Foldable' container and then fold it using the specified stream sum
|
||||
@ -90,7 +142,7 @@ foldWith f = foldr f nil
|
||||
{-# INLINABLE foldMapWith #-}
|
||||
foldMapWith :: (IsStream t, Foldable f)
|
||||
=> (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b
|
||||
foldMapWith f g = foldr (f . g) nil
|
||||
foldMapWith f g = foldr (f . g) K.nil
|
||||
|
||||
-- | Like 'foldMapWith' but with the last two arguments reversed i.e. the
|
||||
-- monadic streaming function is the last argument.
|
||||
@ -99,4 +151,4 @@ foldMapWith f g = foldr (f . g) nil
|
||||
{-# INLINABLE forEachWith #-}
|
||||
forEachWith :: (IsStream t, Foldable f)
|
||||
=> (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b
|
||||
forEachWith f xs g = foldr (f . g) nil xs
|
||||
forEachWith f xs g = foldr (f . g) K.nil xs
|
||||
|
@ -36,9 +36,9 @@ module Streamly.Streams.Serial
|
||||
, wSerially
|
||||
, interleaving -- deprecated
|
||||
|
||||
-- * Running Streams
|
||||
, runStreamT -- deprecated
|
||||
, runInterleavedT -- deprecated
|
||||
-- * Transformation
|
||||
, map
|
||||
, mapM
|
||||
)
|
||||
where
|
||||
|
||||
@ -51,11 +51,14 @@ import Control.Monad.Reader.Class (MonadReader(..))
|
||||
import Control.Monad.State.Class (MonadState(..))
|
||||
import Control.Monad.Trans.Class (MonadTrans(lift))
|
||||
import Data.Semigroup (Semigroup(..))
|
||||
import Prelude hiding (map, mapM)
|
||||
|
||||
import Streamly.Streams.StreamK hiding (serial)
|
||||
import Streamly.Streams.StreamK (IsStream(..), adapt, Stream(..))
|
||||
import qualified Streamly.Streams.StreamK as K
|
||||
import qualified Streamly.Streams.StreamD as D
|
||||
|
||||
#include "Instances.hs"
|
||||
#include "inline.h"
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
-- SerialT
|
||||
@ -118,7 +121,7 @@ import qualified Streamly.Streams.StreamK as K
|
||||
--
|
||||
-- @since 0.2.0
|
||||
newtype SerialT m a = SerialT {getSerialT :: Stream m a}
|
||||
deriving (Semigroup, Monoid, Functor, MonadTrans)
|
||||
deriving (Semigroup, Monoid, MonadTrans)
|
||||
|
||||
-- | A serial IO stream of elements of type @a@. See 'SerialT' documentation
|
||||
-- for more details.
|
||||
@ -144,12 +147,12 @@ instance IsStream SerialT where
|
||||
{-# INLINE consM #-}
|
||||
{-# SPECIALIZE consM :: IO a -> SerialT IO a -> SerialT IO a #-}
|
||||
consM :: Monad m => m a -> SerialT m a -> SerialT m a
|
||||
consM m r = fromStream $ consMSerial m (toStream r)
|
||||
consM m r = fromStream $ K.consMSerial m (toStream r)
|
||||
|
||||
{-# INLINE (|:) #-}
|
||||
{-# SPECIALIZE (|:) :: IO a -> SerialT IO a -> SerialT IO a #-}
|
||||
(|:) :: Monad m => m a -> SerialT m a -> SerialT m a
|
||||
m |: r = fromStream $ consMSerial m (toStream r)
|
||||
m |: r = fromStream $ K.consMSerial m (toStream r)
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
-- Semigroup
|
||||
@ -180,6 +183,14 @@ instance Monad m => Monad (SerialT m) where
|
||||
-- Other instances
|
||||
------------------------------------------------------------------------------
|
||||
|
||||
{-# INLINE_EARLY mapM #-}
|
||||
mapM :: (IsStream t, Monad m) => (a -> m b) -> t m a -> t m b
|
||||
mapM f m = fromStream $ D.toStreamK $ D.mapM f $ D.fromStreamK (toStream m)
|
||||
|
||||
{-# INLINE map #-}
|
||||
map :: (IsStream t, Monad m) => (a -> b) -> t m a -> t m b
|
||||
map f = mapM (return . f)
|
||||
|
||||
MONAD_APPLICATIVE_INSTANCE(SerialT,)
|
||||
MONAD_COMMON_INSTANCES(SerialT,)
|
||||
|
||||
@ -225,7 +236,7 @@ MONAD_COMMON_INSTANCES(SerialT,)
|
||||
--
|
||||
-- @since 0.2.0
|
||||
newtype WSerialT m a = WSerialT {getWSerialT :: Stream m a}
|
||||
deriving (Functor, MonadTrans)
|
||||
deriving (MonadTrans)
|
||||
|
||||
-- | An interleaving serial IO stream of elements of type @a@. See 'WSerialT'
|
||||
-- documentation for more details.
|
||||
@ -258,12 +269,12 @@ instance IsStream WSerialT where
|
||||
{-# INLINE consM #-}
|
||||
{-# SPECIALIZE consM :: IO a -> WSerialT IO a -> WSerialT IO a #-}
|
||||
consM :: Monad m => m a -> WSerialT m a -> WSerialT m a
|
||||
consM m r = fromStream $ consMSerial m (toStream r)
|
||||
consM m r = fromStream $ K.consMSerial m (toStream r)
|
||||
|
||||
{-# INLINE (|:) #-}
|
||||
{-# SPECIALIZE (|:) :: IO a -> WSerialT IO a -> WSerialT IO a #-}
|
||||
(|:) :: Monad m => m a -> WSerialT m a -> WSerialT m a
|
||||
m |: r = fromStream $ consMSerial m (toStream r)
|
||||
m |: r = fromStream $ K.consMSerial m (toStream r)
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
-- Semigroup
|
||||
@ -303,7 +314,7 @@ infixr 5 <=>
|
||||
------------------------------------------------------------------------------
|
||||
|
||||
instance Monoid (WSerialT m a) where
|
||||
mempty = nil
|
||||
mempty = K.nil
|
||||
mappend = (<>)
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
@ -324,21 +335,3 @@ instance Monad m => Monad (WSerialT m) where
|
||||
|
||||
MONAD_APPLICATIVE_INSTANCE(WSerialT,)
|
||||
MONAD_COMMON_INSTANCES(WSerialT,)
|
||||
|
||||
-------------------------------------------------------------------------------
|
||||
-- Running Streams
|
||||
-------------------------------------------------------------------------------
|
||||
|
||||
-- | Same as @runStream@.
|
||||
--
|
||||
-- @since 0.1.0
|
||||
{-# DEPRECATED runStreamT "Please use runStream instead." #-}
|
||||
runStreamT :: Monad m => SerialT m a -> m ()
|
||||
runStreamT = runStream
|
||||
|
||||
-- | Same as @runStream . wSerially@.
|
||||
--
|
||||
-- @since 0.1.0
|
||||
{-# DEPRECATED runInterleavedT "Please use 'runStream . interleaving' instead." #-}
|
||||
runInterleavedT :: Monad m => InterleavedT m a -> m ()
|
||||
runInterleavedT = runStream
|
||||
|
@ -34,24 +34,40 @@ module Streamly.Streams.StreamD
|
||||
Step (..)
|
||||
, Stream (..)
|
||||
|
||||
-- * Elimination
|
||||
, runStream
|
||||
, uncons
|
||||
|
||||
-- * Generation
|
||||
-- * Construction
|
||||
, nil
|
||||
, yield
|
||||
, yieldM
|
||||
|
||||
-- * Generation by Unfolding
|
||||
, unfoldr
|
||||
, unfoldrM
|
||||
|
||||
-- * Special Generation
|
||||
, repeat
|
||||
, enumFromStepN
|
||||
, fromList
|
||||
, fromListM
|
||||
|
||||
-- * Transformation
|
||||
, mapM
|
||||
-- * Deconstruction
|
||||
, uncons
|
||||
|
||||
-- * Elimination by Folding
|
||||
-- ** General Folds
|
||||
, foldr
|
||||
, foldrM
|
||||
, foldl'
|
||||
, foldlM'
|
||||
|
||||
-- ** Special Folds
|
||||
, runStream
|
||||
, mapM_
|
||||
, toList
|
||||
, last
|
||||
|
||||
-- * Mapping
|
||||
, map
|
||||
, mapM
|
||||
|
||||
-- * Conversion
|
||||
, toStreamK
|
||||
@ -59,9 +75,11 @@ module Streamly.Streams.StreamD
|
||||
)
|
||||
where
|
||||
|
||||
import GHC.Types ( SPEC(..) )
|
||||
import Prelude hiding (map, mapM, mapM_, repeat, foldr, last)
|
||||
|
||||
import Streamly.SVar (MonadAsync)
|
||||
import qualified Streamly.Streams.StreamK as K
|
||||
import Prelude hiding (mapM, mapM_, repeat)
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
-- The direct style stream type
|
||||
@ -208,6 +226,14 @@ mapM f (Stream step state) = Stream step' state
|
||||
Yield x s -> f x >>= \a -> return $ Yield a s
|
||||
Stop -> return Stop
|
||||
|
||||
{-# INLINE map #-}
|
||||
map :: Monad m => (a -> b) -> Stream m a -> Stream m b
|
||||
map f = mapM (return . f)
|
||||
|
||||
instance Monad m => Functor (Stream m) where
|
||||
{-# INLINE fmap #-}
|
||||
fmap = map
|
||||
|
||||
-------------------------------------------------------------------------------
|
||||
-- Elimination
|
||||
-------------------------------------------------------------------------------
|
||||
@ -225,15 +251,65 @@ uncons (Stream step state) = go state
|
||||
-- | Run a streaming composition, discard the results.
|
||||
{-# INLINE_LATE runStream #-}
|
||||
runStream :: Monad m => Stream m a -> m ()
|
||||
runStream (Stream step state) = go state
|
||||
runStream (Stream step state) = go SPEC state
|
||||
where
|
||||
go st = do
|
||||
go !_ st = do
|
||||
r <- step st
|
||||
case r of
|
||||
Yield _ s -> go s
|
||||
Yield _ s -> go SPEC s
|
||||
Stop -> return ()
|
||||
|
||||
-- | Execute a monadic action for each element of the 'Stream'
|
||||
{-# INLINE_NORMAL mapM_ #-}
|
||||
mapM_ :: Monad m => (a -> m b) -> Stream m a -> m ()
|
||||
mapM_ m = runStream . mapM m
|
||||
|
||||
{-# INLINE_NORMAL foldrM #-}
|
||||
foldrM :: Monad m => (a -> b -> m b) -> b -> Stream m a -> m b
|
||||
foldrM f z (Stream step state) = go SPEC state
|
||||
where
|
||||
go !_ st = do
|
||||
r <- step st
|
||||
case r of
|
||||
Yield x s -> go SPEC s >>= f x
|
||||
Stop -> return z
|
||||
|
||||
{-# INLINE_NORMAL foldr #-}
|
||||
foldr :: Monad m => (a -> b -> b) -> b -> Stream m a -> m b
|
||||
foldr f = foldrM (\a b -> return (f a b))
|
||||
|
||||
{-# INLINE_NORMAL foldlM' #-}
|
||||
foldlM' :: Monad m => (b -> a -> m b) -> b -> Stream m a -> m b
|
||||
foldlM' fstep begin (Stream step state) = go SPEC begin state
|
||||
where
|
||||
go !_ acc st = acc `seq` do
|
||||
r <- step st
|
||||
case r of
|
||||
Yield x s -> do
|
||||
acc' <- fstep acc x
|
||||
go SPEC acc' s
|
||||
Stop -> return acc
|
||||
|
||||
{-# INLINE foldl' #-}
|
||||
foldl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> m b
|
||||
foldl' fstep = foldlM' (\b a -> return (fstep b a))
|
||||
|
||||
{-# INLINE toList #-}
|
||||
toList :: Monad m => Stream m a -> m [a]
|
||||
toList = foldr (:) []
|
||||
|
||||
{-# INLINE_NORMAL last #-}
|
||||
last :: Monad m => Stream m a -> m (Maybe a)
|
||||
last (Stream step state) = go0 SPEC state
|
||||
where
|
||||
go0 !_ st = do
|
||||
r <- step st
|
||||
case r of
|
||||
Yield x s -> go1 SPEC x s
|
||||
Stop -> return Nothing
|
||||
|
||||
go1 !_ x st = do
|
||||
r <- step st
|
||||
case r of
|
||||
Yield y s -> go1 SPEC y s
|
||||
Stop -> return (Just x)
|
||||
|
@ -1,3 +1,4 @@
|
||||
{-# LANGUAGE BangPatterns #-}
|
||||
{-# LANGUAGE CPP #-}
|
||||
{-# LANGUAGE ConstraintKinds #-}
|
||||
{-# LANGUAGE FlexibleContexts #-}
|
||||
@ -34,10 +35,6 @@ module Streamly.Streams.StreamK
|
||||
-- * The stream type
|
||||
, Stream (..)
|
||||
|
||||
-- * Elimination
|
||||
, foldStream
|
||||
, runStream
|
||||
|
||||
-- * Construction
|
||||
, mkStream
|
||||
, nil
|
||||
@ -57,6 +54,24 @@ module Streamly.Streams.StreamK
|
||||
, repeat
|
||||
, fromFoldable
|
||||
|
||||
-- * Elimination
|
||||
, foldStream
|
||||
, foldr
|
||||
, foldrM
|
||||
, foldx
|
||||
, foldl'
|
||||
, foldxM
|
||||
, foldlM'
|
||||
|
||||
, runStream
|
||||
, mapM_
|
||||
, toList
|
||||
, last
|
||||
|
||||
-- * Transformation
|
||||
, map
|
||||
, mapM
|
||||
|
||||
-- * Semigroup Style Composition
|
||||
, serial
|
||||
|
||||
@ -71,10 +86,12 @@ module Streamly.Streams.StreamK
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Monad (void)
|
||||
import Control.Monad.Reader.Class (MonadReader(..))
|
||||
import Control.Monad.Trans.Class (MonadTrans(lift))
|
||||
import Data.Semigroup (Semigroup(..))
|
||||
import Prelude hiding (repeat)
|
||||
import Prelude hiding (foldl, foldr, last, map, mapM, mapM_, repeat)
|
||||
import qualified Prelude
|
||||
|
||||
import Streamly.SVar
|
||||
|
||||
@ -328,6 +345,74 @@ foldStream svr blank single step m =
|
||||
let yieldk a x = step a (fromStream x)
|
||||
in (unStream (toStream m)) svr blank single yieldk
|
||||
|
||||
-- | Lazy right associative fold.
|
||||
foldr :: (IsStream t, Monad m) => (a -> b -> b) -> b -> t m a -> m b
|
||||
foldr step acc m = go (toStream m)
|
||||
where
|
||||
go m1 =
|
||||
let stop = return acc
|
||||
single a = return (step a acc)
|
||||
yieldk a r = go r >>= \b -> return (step a b)
|
||||
in (unStream m1) Nothing stop single yieldk
|
||||
|
||||
-- | Lazy right fold with a monadic step function.
|
||||
{-# INLINE foldrM #-}
|
||||
foldrM :: (IsStream t, Monad m) => (a -> b -> m b) -> b -> t m a -> m b
|
||||
foldrM step acc m = go (toStream m)
|
||||
where
|
||||
go m1 =
|
||||
let stop = return acc
|
||||
single a = step a acc
|
||||
yieldk a r = go r >>= step a
|
||||
in (unStream m1) Nothing stop single yieldk
|
||||
|
||||
-- | Strict left fold with an extraction function. Like the standard strict
|
||||
-- left fold, but applies a user supplied extraction function (the third
|
||||
-- argument) to the folded value at the end. This is designed to work with the
|
||||
-- @foldl@ library. The suffix @x@ is a mnemonic for extraction.
|
||||
{-# INLINE foldx #-}
|
||||
foldx :: (IsStream t, Monad m)
|
||||
=> (x -> a -> x) -> x -> (x -> b) -> t m a -> m b
|
||||
foldx step begin done m = get $ go (toStream m) begin
|
||||
where
|
||||
{-# NOINLINE get #-}
|
||||
get m1 =
|
||||
let single = return . done
|
||||
in (unStream m1) Nothing undefined single undefined
|
||||
|
||||
-- Note, this can be implemented by making a recursive call to "go",
|
||||
-- however that is more expensive because of unnecessary recursion
|
||||
-- that cannot be tail call optimized. Unfolding recursion explicitly via
|
||||
-- continuations is much more efficient.
|
||||
go m1 !acc = Stream $ \_ _ sng yld ->
|
||||
let stop = sng acc
|
||||
single a = sng $ step acc a
|
||||
yieldk a r =
|
||||
let stream = go r (step acc a)
|
||||
in (unStream stream) Nothing undefined sng yld
|
||||
in (unStream m1) Nothing stop single yieldk
|
||||
|
||||
-- | Strict left associative fold.
|
||||
{-# INLINE foldl' #-}
|
||||
foldl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> m b
|
||||
foldl' step begin m = foldx step begin id m
|
||||
|
||||
-- XXX replace the recursive "go" with explicit continuations.
|
||||
-- | Like 'foldx', but with a monadic step function.
|
||||
foldxM :: (IsStream t, Monad m)
|
||||
=> (x -> a -> m x) -> m x -> (x -> m b) -> t m a -> m b
|
||||
foldxM step begin done m = go begin (toStream m)
|
||||
where
|
||||
go !acc m1 =
|
||||
let stop = acc >>= done
|
||||
single a = acc >>= \b -> step b a >>= done
|
||||
yieldk a r = acc >>= \b -> go (step b a) r
|
||||
in (unStream m1) Nothing stop single yieldk
|
||||
|
||||
-- | Like 'foldl'' but with a monadic step function.
|
||||
foldlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> b -> t m a -> m b
|
||||
foldlM' step begin m = foldxM step (return begin) return m
|
||||
|
||||
runStream :: (Monad m, IsStream t) => t m a -> m ()
|
||||
runStream m = go (toStream m)
|
||||
where
|
||||
@ -337,6 +422,26 @@ runStream m = go (toStream m)
|
||||
yieldk _ r = go (toStream r)
|
||||
in (unStream m1) Nothing stop single yieldk
|
||||
|
||||
-- | Apply a monadic action to each element of the stream and discard the
|
||||
-- output of the action.
|
||||
mapM_ :: (IsStream t, Monad m) => (a -> m b) -> t m a -> m ()
|
||||
mapM_ f m = go (toStream m)
|
||||
where
|
||||
go m1 =
|
||||
let stop = return ()
|
||||
single a = void (f a)
|
||||
yieldk a r = f a >> go r
|
||||
in (unStream m1) Nothing stop single yieldk
|
||||
|
||||
{-# INLINABLE toList #-}
|
||||
toList :: (IsStream t, Monad m) => t m a -> m [a]
|
||||
toList = foldr (:) []
|
||||
|
||||
-- | Extract the last element of the stream, if any.
|
||||
{-# INLINE last #-}
|
||||
last :: (IsStream t, Monad m) => t m a -> m (Maybe a)
|
||||
last = foldx (\_ y -> Just y) Nothing id
|
||||
|
||||
-------------------------------------------------------------------------------
|
||||
-- Generation
|
||||
-------------------------------------------------------------------------------
|
||||
@ -409,11 +514,24 @@ instance Monoid (Stream m a) where
|
||||
-- Functor
|
||||
-------------------------------------------------------------------------------
|
||||
|
||||
{-# INLINE map #-}
|
||||
map :: (IsStream t, Monad m) => (a -> b) -> t m a -> t m b
|
||||
map f m = fromStream $ Stream $ \_ stp sng yld ->
|
||||
let single = sng . f
|
||||
yieldk a r = yld (f a) (fmap f r)
|
||||
in unStream (toStream m) Nothing stp single yieldk
|
||||
|
||||
instance Monad m => Functor (Stream m) where
|
||||
fmap f m = Stream $ \_ stp sng yld ->
|
||||
let single = sng . f
|
||||
yieldk a r = yld (f a) (fmap f r)
|
||||
in (unStream m) Nothing stp single yieldk
|
||||
fmap = map
|
||||
|
||||
{-# INLINE mapM #-}
|
||||
mapM :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b
|
||||
mapM f m = go (toStream m)
|
||||
where
|
||||
go m1 = fromStream $ Stream $ \svr stp sng yld ->
|
||||
let single a = f a >>= sng
|
||||
yieldk a r = unStream (toStream (f a |: (go r))) svr stp sng yld
|
||||
in (unStream m1) Nothing stp single yieldk
|
||||
|
||||
-------------------------------------------------------------------------------
|
||||
-- Bind utility
|
||||
|
@ -30,23 +30,24 @@ module Streamly.Streams.Zip
|
||||
, ZipStream -- deprecated
|
||||
, zipSerially
|
||||
, zipping -- deprecated
|
||||
, runZipStream -- deprecated
|
||||
|
||||
, ZipAsyncM
|
||||
, ZipAsync
|
||||
, zipAsyncly
|
||||
, zippingAsync -- deprecated
|
||||
, runZipAsync -- deprecated
|
||||
)
|
||||
where
|
||||
|
||||
import Data.Semigroup (Semigroup(..))
|
||||
import Prelude hiding (repeat, zipWith)
|
||||
import Prelude hiding (map, repeat, zipWith)
|
||||
|
||||
import Streamly.Streams.StreamK (IsStream(..), Stream(..))
|
||||
import Streamly.Streams.Async (mkAsync)
|
||||
import Streamly.Streams.StreamK
|
||||
import Streamly.Streams.Serial (map)
|
||||
import Streamly.SVar (MonadAsync)
|
||||
|
||||
import qualified Streamly.Streams.StreamK as K
|
||||
|
||||
#include "Instances.hs"
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
@ -62,7 +63,7 @@ zipWithS f m1 m2 = go m1 m2
|
||||
let single2 b = sng (f a b)
|
||||
yield2 b rb = yld (f a b) (go ra rb)
|
||||
in unStream my Nothing stp single2 yield2
|
||||
let single1 a = merge a nil
|
||||
let single1 a = merge a K.nil
|
||||
yield1 a ra = merge a ra
|
||||
unStream mx Nothing stp single1 yield1
|
||||
|
||||
@ -85,7 +86,7 @@ zipWithM f m1 m2 = fromStream $ go (toStream m1) (toStream m2)
|
||||
single2 b = runIt $ toStream (f a b)
|
||||
yield2 b rb = runIt $ toStream (f a b) <> go ra rb
|
||||
in unStream my Nothing stp single2 yield2
|
||||
let single1 a = merge a nil
|
||||
let single1 a = merge a K.nil
|
||||
yield1 a ra = merge a ra
|
||||
unStream mx Nothing stp single1 yield1
|
||||
|
||||
@ -112,7 +113,7 @@ zipWithM f m1 m2 = fromStream $ go (toStream m1) (toStream m2)
|
||||
--
|
||||
-- @since 0.2.0
|
||||
newtype ZipSerialM m a = ZipSerialM {getZipSerialM :: Stream m a}
|
||||
deriving (Functor, Semigroup, Monoid)
|
||||
deriving (Semigroup, Monoid)
|
||||
|
||||
-- |
|
||||
-- @since 0.1.0
|
||||
@ -128,7 +129,7 @@ type ZipSerial a = ZipSerialM IO a
|
||||
--
|
||||
-- @since 0.2.0
|
||||
zipSerially :: IsStream t => ZipSerialM m a -> t m a
|
||||
zipSerially = adapt
|
||||
zipSerially = K.adapt
|
||||
|
||||
-- | Same as 'zipSerially'.
|
||||
--
|
||||
@ -144,15 +145,18 @@ instance IsStream ZipSerialM where
|
||||
{-# INLINE consM #-}
|
||||
{-# SPECIALIZE consM :: IO a -> ZipSerialM IO a -> ZipSerialM IO a #-}
|
||||
consM :: Monad m => m a -> ZipSerialM m a -> ZipSerialM m a
|
||||
consM m r = fromStream $ consMSerial m (toStream r)
|
||||
consM m r = fromStream $ K.consMSerial m (toStream r)
|
||||
|
||||
{-# INLINE (|:) #-}
|
||||
{-# SPECIALIZE (|:) :: IO a -> ZipSerialM IO a -> ZipSerialM IO a #-}
|
||||
(|:) :: Monad m => m a -> ZipSerialM m a -> ZipSerialM m a
|
||||
m |: r = fromStream $ consMSerial m (toStream r)
|
||||
m |: r = fromStream $ K.consMSerial m (toStream r)
|
||||
|
||||
instance Monad m => Functor (ZipSerialM m) where
|
||||
fmap = map
|
||||
|
||||
instance Monad m => Applicative (ZipSerialM m) where
|
||||
pure = ZipSerialM . repeat
|
||||
pure = ZipSerialM . K.repeat
|
||||
m1 <*> m2 = fromStream $ zipWith id (toStream m1) (toStream m2)
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
@ -203,7 +207,7 @@ zipAsyncWithM f m1 m2 = fromStream $ Stream $ \_ stp sng yld -> do
|
||||
--
|
||||
-- @since 0.2.0
|
||||
newtype ZipAsyncM m a = ZipAsyncM {getZipAsyncM :: Stream m a}
|
||||
deriving (Functor, Semigroup, Monoid)
|
||||
deriving (Semigroup, Monoid)
|
||||
|
||||
-- | An IO stream whose applicative instance zips streams wAsyncly.
|
||||
--
|
||||
@ -214,7 +218,7 @@ type ZipAsync a = ZipAsyncM IO a
|
||||
--
|
||||
-- @since 0.2.0
|
||||
zipAsyncly :: IsStream t => ZipAsyncM m a -> t m a
|
||||
zipAsyncly = adapt
|
||||
zipAsyncly = K.adapt
|
||||
|
||||
-- | Same as 'zipAsyncly'.
|
||||
--
|
||||
@ -229,27 +233,16 @@ instance IsStream ZipAsyncM where
|
||||
{-# INLINE consM #-}
|
||||
{-# SPECIALIZE consM :: IO a -> ZipAsyncM IO a -> ZipAsyncM IO a #-}
|
||||
consM :: Monad m => m a -> ZipAsyncM m a -> ZipAsyncM m a
|
||||
consM m r = fromStream $ consMSerial m (toStream r)
|
||||
consM m r = fromStream $ K.consMSerial m (toStream r)
|
||||
|
||||
{-# INLINE (|:) #-}
|
||||
{-# SPECIALIZE (|:) :: IO a -> ZipAsyncM IO a -> ZipAsyncM IO a #-}
|
||||
(|:) :: Monad m => m a -> ZipAsyncM m a -> ZipAsyncM m a
|
||||
m |: r = fromStream $ consMSerial m (toStream r)
|
||||
m |: r = fromStream $ K.consMSerial m (toStream r)
|
||||
|
||||
instance Monad m => Functor (ZipAsyncM m) where
|
||||
fmap = map
|
||||
|
||||
instance MonadAsync m => Applicative (ZipAsyncM m) where
|
||||
pure = ZipAsyncM . repeat
|
||||
pure = ZipAsyncM . K.repeat
|
||||
m1 <*> m2 = zipAsyncWith id m1 m2
|
||||
|
||||
-- | Same as @runStream . zipping@.
|
||||
--
|
||||
-- @since 0.1.0
|
||||
{-# DEPRECATED runZipStream "Please use 'runStream . zipSerially instead." #-}
|
||||
runZipStream :: Monad m => ZipSerialM m a -> m ()
|
||||
runZipStream = runStream
|
||||
|
||||
-- | Same as @runStream . zippingAsync@.
|
||||
--
|
||||
-- @since 0.1.0
|
||||
{-# DEPRECATED runZipAsync "Please use 'runStream . zipAsyncly instead." #-}
|
||||
runZipAsync :: Monad m => ZipAsyncM m a -> m ()
|
||||
runZipAsync = runStream
|
||||
|
@ -160,6 +160,7 @@ library
|
||||
-Wnoncanonical-monadfail-instances
|
||||
|
||||
build-depends: base >= 4.8 && < 5
|
||||
, ghc-prim >= 0.2 && < 0.6
|
||||
, containers >= 0.5 && < 0.6
|
||||
, heaps >= 0.3 && < 0.4
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user