@ -18,7 +18,7 @@ import Prelude hiding ()
-- import qualified Prelude as P
-- import qualified Data.List as List
import qualified Streamly.Internal.Data.Stream.StreamDK as S
import qualified Streamly.Internal.Data.Stream.StreamK.Alt as S
-- import qualified Streamly.Internal.Data.Stream.Common as SP
-- import qualified Streamly.Internal.Data.SVar as S

@ -37,7 +37,7 @@ module Streamly.Data.Stream
-- * Construction
-- | Functions ending in the general shape @b -> Stream m a@.
-- See also: "Streamly.Internal.Data.Stream.Generate" for
-- See also: "Streamly.Internal.Data.Stream.StreamD.Generate" for
-- @Pre-release@ functions.
-- Useful Idioms:
@ -111,7 +111,7 @@ module Streamly.Data.Stream
-- | Functions ending in the general shape @Stream m a -> m b@ or @Stream m
-- a -> m (b, Stream m a)@
-- See also: "Streamly.Internal.Data.Stream.Eliminate" for @Pre-release@
-- See also: "Streamly.Internal.Data.Stream.StreamD.Eliminate" for @Pre-release@
-- functions.
-- EXPLANATION: In imperative terms a fold can be considered as a loop over the stream
@ -253,7 +253,7 @@ module Streamly.Data.Stream
-- * Scanning
-- | Stateful one-to-one transformations.
-- See also: "Streamly.Internal.Data.Stream.Transform" for
-- See also: "Streamly.Internal.Data.Stream.StreamD.Transform" for
-- @Pre-release@ functions.
@ -471,7 +471,7 @@ module Streamly.Data.Stream
-- For example, instead of calling them on a stream of chars call them on a
-- stream of arrays before flattening it to a stream of chars.
-- See also: "Streamly.Internal.Data.Stream.Exception" for
-- See also: "Streamly.Internal.Data.Stream.StreamD.Exception" for
-- @Pre-release@ functions.
, onException

View File

@ -66,13 +66,19 @@ where
import Control.Arrow (second)
import Data.Functor.Identity (Identity, runIdentity)
import GHC.Exts (IsList(..), IsString(..))
import Streamly.Internal.Data.Stream.Cross (CrossStream(..))
import Streamly.Internal.Data.Stream.Type (Stream)
import Streamly.Internal.Data.Stream.Zip (ZipStream(..))
import Text.Read (readPrec)
import Streamly.Internal.Data.Stream.StreamD.Type
( CrossStream
, mkCross
, unCross
import Streamly.Internal.Data.Stream.StreamD.Type (Stream)
import Text.Read
( Lexeme(Ident), lexP, parens, prec, readPrec, readListPrec
, readListPrecDefault)
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
import qualified Streamly.Internal.Data.Stream.Type as Stream
import qualified Streamly.Internal.Data.Stream.StreamD.Type as Stream
import qualified Streamly.Internal.Data.Stream.StreamD.Generate as Stream
-- XXX Rename to PureStream.
@ -82,21 +88,21 @@ import qualified Streamly.Internal.Data.Stream.Type as Stream
newtype List a = List { toCrossStream :: CrossStream Identity a }
( Eq, Ord
, Semigroup, Monoid, Functor, Foldable
, Applicative, Traversable, Monad, IsList)
, Functor, Foldable
, Applicative, Monad, IsList)
toStream :: List a -> Stream Identity a
toStream = unCrossStream . toCrossStream
toStream = unCross . toCrossStream
fromStream :: Stream Identity a -> List a
fromStream xs = List (CrossStream xs)
fromStream xs = List (mkCross xs)
instance (a ~ Char) => IsString (List a) where
{-# INLINE fromString #-}
fromString = List . fromList
instance Show a => Show (List a) where
show (List x) = show $ unCrossStream x
show (List x) = show $ unCross x
instance Read a => Read (List a) where
readPrec = fromStream <$> readPrec
@ -119,7 +125,7 @@ pattern Nil <- (runIdentity . K.null . Stream.toStreamK . toStream -> True)
Nil = List $ CrossStream (Stream.fromStreamK K.nil)
Nil = List $ mkCross (Stream.fromStreamK K.nil)
infixr 5 `Cons`
@ -128,17 +134,77 @@ infixr 5 `Cons`
pattern Cons :: a -> List a -> List a
pattern Cons x xs <-
(fmap (second (List . CrossStream . Stream.fromStreamK))
(fmap (second (List . mkCross . Stream.fromStreamK))
. runIdentity . K.uncons . Stream.toStreamK . toStream
-> Just (x, xs)
Cons x xs = List $ CrossStream $ Stream.cons x (toStream xs)
Cons x xs = List $ mkCross $ Stream.cons x (toStream xs)
{-# COMPLETE Nil, Cons #-}
-- ZipStream
-- $setup
-- >>> import qualified Streamly.Data.Fold as Fold
-- >>> import qualified Streamly.Data.Stream as Stream
-- >>> import qualified Streamly.Internal.Data.Stream.Zip as Stream
-- Serially Zipping Streams
-- | For 'ZipStream':
-- @
-- (<>) = 'Streamly.Data.Stream.append'
-- (\<*>) = 'Streamly.Data.Stream.zipWith' id
-- @
-- Applicative evaluates the streams being zipped serially:
-- >>> s1 = Stream.ZipStream $ Stream.fromFoldable [1, 2]
-- >>> s2 = Stream.ZipStream $ Stream.fromFoldable [3, 4]
-- >>> s3 = Stream.ZipStream $ Stream.fromFoldable [5, 6]
-- >>> s = (,,) <$> s1 <*> s2 <*> s3
-- >>> Stream.fold Fold.toList (Stream.unZipStream s)
-- [(1,3,5),(2,4,6)]
newtype ZipStream m a = ZipStream {unZipStream :: Stream m a}
deriving (Functor)
deriving instance IsList (ZipStream Identity a)
deriving instance (a ~ Char) => IsString (ZipStream Identity a)
deriving instance Eq a => Eq (ZipStream Identity a)
deriving instance Ord a => Ord (ZipStream Identity a)
deriving instance (Foldable m, Monad m) => Foldable (ZipStream m)
instance Show a => Show (ZipStream Identity a) where
showsPrec p dl = showParen (p > 10) $
showString "fromList " . shows (toList dl)
instance Read a => Read (ZipStream Identity a) where
readPrec = parens $ prec 10 $ do
Ident "fromList" <- lexP
fromList <$> readPrec
readListPrec = readListPrecDefault
type ZipSerialM = ZipStream
-- | An IO stream whose applicative instance zips streams serially.
type ZipSerial = ZipSerialM IO
instance Monad m => Applicative (ZipStream m) where
pure = ZipStream . Stream.repeat
{-# INLINE (<*>) #-}
ZipStream m1 <*> ZipStream m2 = ZipStream $ Stream.zipWith id m1 m2
-- ZipList
@ -149,8 +215,8 @@ pattern Cons x xs <-
newtype ZipList a = ZipList { toZipStream :: ZipStream Identity a }
( Show, Read, Eq, Ord
, Semigroup, Monoid, Functor, Foldable
, Applicative, Traversable, IsList
, Functor, Foldable
, Applicative, IsList
instance (a ~ Char) => IsString (ZipList a) where
@ -160,7 +226,7 @@ instance (a ~ Char) => IsString (ZipList a) where
-- | Convert a 'ZipList' to a regular 'List'
fromZipList :: ZipList a -> List a
fromZipList (ZipList zs) = List $ CrossStream (unZipStream zs)
fromZipList (ZipList zs) = List $ mkCross (unZipStream zs)
-- | Convert a regular 'List' to a 'ZipList'

ghandle handler =
. D.ghandle (\e xs -> toStreamD $ handler e (fromStreamD xs))
. toStreamD
-- | When evaluating a stream if an exception occurs, stream evaluation aborts
-- and the specified exception handler is run with the exception as argument.
-- /Inhibits stream fusion/
{-# INLINE handle #-}
handle :: (MonadCatch m, Exception e)
=> (e -> Stream m a) -> Stream m a -> Stream m a
handle handler xs =
fromStreamD $ D.handle (toStreamD . handler) $ toStreamD xs

-- |
-- Module : Streamly.Internal.Data.Stream.Expand
-- Copyright : (c) 2017 Composewell Technologies
-- License : BSD-3-Clause
-- Maintainer :
-- Stability : experimental
-- Portability : GHC
-- Expand a stream by combining two or more streams or by combining streams
-- with unfolds.
module Streamly.Internal.Data.Stream.Expand
-- * Binary Combinators (Linear)
-- | Functions ending in the shape:
-- @Stream m a -> Stream m a -> Stream m a@.
-- The functions in this section have a linear or flat n-ary combining
-- characterstics. It means that when combined @n@ times (e.g. @a `serial`
-- b `serial` c ...@) the resulting expression will have an @O(n)@
-- complexity (instead O(n^2) for pair wise combinators described in the
-- next section. These functions can be used efficiently with
-- 'concatMapWith' et. al. combinators that combine streams in a linear
-- fashion (contrast with 'mergeMapWith' which combines streams as a
-- binary tree).
-- * Binary Combinators (Pair Wise)
-- | Like the functions in the section above these functions also combine
-- two streams into a single stream but when used @n@ times linearly they
-- exhibit O(n^2) complexity. They are best combined in a binary tree
-- fashion using 'mergeMapWith' giving a @n * log n@ complexity. Avoid
-- using these with 'concatMapWith' when combining a large or infinite
-- number of streams.
-- ** Append
, append2
-- ** Interleave
, interleave
, interleave2
, interleaveFst
, interleaveFst2
, interleaveFstSuffix2
, interleaveMin
, interleaveMin2
-- ** Round Robin
, roundrobin
-- ** Merge
, mergeBy
, mergeByM
, mergeByM2
, mergeMinBy
, mergeFstBy
-- ** Zip
, zipWith
, zipWithM
-- * Combine Streams and Unfolds
-- |
-- Expand a stream by repeatedly using an unfold and merging the resulting
-- streams. Functions generally ending in the shape:
-- @Unfold m a b -> Stream m a -> Stream m b@
-- ** Unfold and combine streams
-- | Unfold and flatten streams.
, unfoldMany -- XXX Rename to unfoldAppend
, unfoldInterleave
, unfoldRoundRobin
-- ** Interpose
-- | Insert effects between streams. Like unfoldMany but intersperses an
-- effect between the streams. A special case of gintercalate.
, interpose
, interposeSuffix
-- , interposeBy
-- ** Intercalate
-- | Insert Streams between Streams.
-- Like unfoldMany but intersperses streams from another source between
-- the streams from the first source.
, intercalate
, intercalateSuffix
, gintercalate
, gintercalateSuffix
-- * Combine Streams of Streams
-- | Map and serially append streams. 'concatMapM' is a generalization of
-- the binary append operation to append many streams.
, concatMapM
, concatMap
, concatEffect
, concat
-- * ConcatMapWith
-- | Map and flatten a stream like 'concatMap' but using a custom binary
-- stream merging combinator instead of just appending the streams. The
-- merging occurs sequentially, it works efficiently for 'serial', 'async',
-- 'ahead' like merge operations where we consume one stream before the
-- next or in case of 'wAsync' or 'parallel' where we consume all streams
-- simultaneously anyway.
-- However, in cases where the merging consumes streams in a round robin
-- fashion, a pair wise merging using 'mergeMapWith' would be more
-- efficient. These cases include operations like 'mergeBy' or 'zipWith'.
, concatMapWith
, bindWith
, concatSmapMWith
-- * MergeMapWith
-- | See the notes about suitable merge functions in the 'concatMapWith'
-- section.
, mergeMapWith
-- * Iterate
-- | Map and flatten Trees of Streams
, unfoldIterateDfs
, unfoldIterateBfs
, unfoldIterateBfsRev
, concatIterateWith
, mergeIterateWith
, concatIterateDfs
, concatIterateBfs
-- More experimental ops
, concatIterateBfsRev
, concatIterateLeftsWith
, concatIterateScanWith
, concatIterateScan
#include "inline.hs"
import Streamly.Internal.Data.Stream.Bottom
( concatEffect, concatMapM, concatMap, smapM, zipWith, zipWithM)
import Streamly.Internal.Data.Stream.Type
( Stream, fromStreamD, fromStreamK, toStreamD, toStreamK
, bindWith, concatMapWith, cons, nil)
import Streamly.Internal.Data.Unfold.Type (Unfold)
import qualified Streamly.Internal.Data.Stream as D
import qualified Streamly.Internal.Data.StreamK as K (mergeBy, mergeByM)
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
import Prelude hiding (concat, concatMap, zipWith)
-- $setup
-- >>> :m
-- >>> import Data.Either (either)
-- >>> import Data.IORef
-- >>> import Streamly.Internal.Data.Stream (Stream)
-- >>> import Prelude hiding (zipWith, concatMap, concat)
-- >>> import qualified Streamly.Data.Array as Array
-- >>> import qualified Streamly.Internal.Data.Fold as Fold
-- >>> import qualified Streamly.Internal.Data.Stream as Stream
-- >>> import qualified Streamly.Internal.Data.Unfold as Unfold
-- >>> import qualified Streamly.Internal.Data.Parser as Parser
-- >>> import qualified Streamly.Internal.FileSystem.Dir as Dir
-- Appending
infixr 6 `append2`
-- | This is fused version of 'append'. It could be up to 100x faster than
-- 'append' when combining two fusible streams. However, it slows down
-- quadratically with the number of streams being appended. Therefore, it is
-- suitable for ad-hoc append of a few streams, and should not be used with
-- 'concatMapWith' or 'mergeMapWith'.
-- /Fused/
{-# INLINE append2 #-}
append2 ::Monad m => Stream m b -> Stream m b -> Stream m b
append2 m1 m2 = fromStreamD $ D.append (toStreamD m1) (toStreamD m2)
infixr 6 `append`
-- | Appends two streams sequentially, yielding all elements from the first
-- stream, and then all elements from the second stream.
-- >>> s1 = Stream.fromList [1,2]
-- >>> s2 = Stream.fromList [3,4]
-- >>> Stream.fold Fold.toList $ s1 `Stream.append` s2
-- [1,2,3,4]
-- This has O(n) append performance where @n@ is the number of streams. It can
-- be used to efficiently fold an infinite lazy container of streams
-- 'concatMapWith' et. al.
-- See 'append2' for a fusible alternative.
-- /CPS/
{-# INLINE append #-}
append :: Stream m a -> Stream m a -> Stream m a
append = (<>)
-- Interleaving
infixr 6 `interleave`
-- | Interleaves two streams, yielding one element from each stream
-- alternately. When one stream stops the rest of the other stream is used in
-- the output stream.
-- When joining many streams in a left associative manner earlier streams will
-- get exponential priority than the ones joining later. Because of exponential
-- weighting it can be used with 'concatMapWith' even on a large number of
-- streams.
-- See 'interleave2' for a fusible alternative.
-- /CPS/
{-# INLINE interleave #-}
interleave :: Stream m a -> Stream m a -> Stream m a
interleave s1 s2 = fromStreamK $ K.interleave (toStreamK s1) (toStreamK s2)
{-# INLINE interleave2 #-}
interleave2 :: Monad m => Stream m a -> Stream m a -> Stream m a
interleave2 s1 s2 = fromStreamD $ D.interleave (toStreamD s1) (toStreamD s2)
-- | Like `interleave` but stops interleaving as soon as the first stream
-- stops.
-- See 'interleaveFst2' for a fusible alternative.
-- /CPS/
{-# INLINE interleaveFst #-}
interleaveFst :: Stream m a -> Stream m a -> Stream m a
interleaveFst s1 s2 =
fromStreamK $ K.interleaveFst (toStreamK s1) (toStreamK s2)
-- | Like `interleave` but stops interleaving as soon as any of the two streams
-- stops.
-- See 'interleaveMin2' for a fusible alternative.
-- /CPS/
{-# INLINE interleaveMin #-}
interleaveMin :: Stream m a -> Stream m a -> Stream m a
interleaveMin s1 s2 =
fromStreamK $ K.interleaveMin (toStreamK s1) (toStreamK s2)
{-# INLINE interleaveMin2 #-}
interleaveMin2 :: Monad m => Stream m a -> Stream m a -> Stream m a
interleaveMin2 s1 s2 =
fromStreamD $ D.interleaveMin (toStreamD s1) (toStreamD s2)
-- | Interleaves the outputs of two streams, yielding elements from each stream
-- alternately, starting from the first stream. As soon as the first stream
-- finishes, the output stops, discarding the remaining part of the second
-- stream. In this case, the last element in the resulting stream would be from
-- the second stream. If the second stream finishes early then the first stream
-- still continues to yield elements until it finishes.
-- >>> :set -XOverloadedStrings
-- >>> import Data.Functor.Identity (Identity)
-- >>> Stream.interleaveFstSuffix2 "abc" ",,,," :: Stream Identity Char
-- fromList "a,b,c,"
-- >>> Stream.interleaveFstSuffix2 "abc" "," :: Stream Identity Char
-- fromList "a,bc"
-- 'interleaveFstSuffix2' is a dual of 'interleaveFst2'.
-- Do not use at scale in concatMapWith.
-- /Pre-release/
{-# INLINE interleaveFstSuffix2 #-}
interleaveFstSuffix2 :: Monad m => Stream m b -> Stream m b -> Stream m b
interleaveFstSuffix2 m1 m2 =
fromStreamD $ D.interleaveFstSuffix (toStreamD m1) (toStreamD m2)
-- | Interleaves the outputs of two streams, yielding elements from each stream
-- alternately, starting from the first stream and ending at the first stream.
-- If the second stream is longer than the first, elements from the second
-- stream are infixed with elements from the first stream. If the first stream
-- is longer then it continues yielding elements even after the second stream
-- has finished.
-- >>> :set -XOverloadedStrings
-- >>> import Data.Functor.Identity (Identity)
-- >>> Stream.interleaveFst2 "abc" ",,,," :: Stream Identity Char
-- fromList "a,b,c"
-- >>> Stream.interleaveFst2 "abc" "," :: Stream Identity Char
-- fromList "a,bc"
-- 'interleaveFst2' is a dual of 'interleaveFstSuffix2'.
-- Do not use at scale in concatMapWith.
-- /Pre-release/
{-# INLINE interleaveFst2 #-}
interleaveFst2 :: Monad m => Stream m b -> Stream m b -> Stream m b
interleaveFst2 m1 m2 =
fromStreamD $ D.interleaveFst (toStreamD m1) (toStreamD m2)
-- Scheduling
-- | Schedule the execution of two streams in a fair round-robin manner,
-- executing each stream once, alternately. Execution of a stream may not
-- necessarily result in an output, a stream may chose to @Skip@ producing an
-- element until later giving the other stream a chance to run. Therefore, this
-- combinator fairly interleaves the execution of two streams rather than
-- fairly interleaving the output of the two streams. This can be useful in
-- co-operative multitasking without using explicit threads. This can be used
-- as an alternative to `async`.
-- Do not use at scale in concatMapWith.
-- /Pre-release/
{-# INLINE roundrobin #-}
roundrobin :: Monad m => Stream m b -> Stream m b -> Stream m b
roundrobin m1 m2 = fromStreamD $ D.roundRobin (toStreamD m1) (toStreamD m2)
-- Merging (sorted streams)
-- | Merge two streams using a comparison function. The head elements of both
-- the streams are compared and the smaller of the two elements is emitted, if
-- both elements are equal then the element from the first stream is used
-- first.
-- If the streams are sorted in ascending order, the resulting stream would
-- also remain sorted in ascending order.
-- >>> s1 = Stream.fromList [1,3,5]
-- >>> s2 = Stream.fromList [2,4,6,8]
-- >>> Stream.fold Fold.toList $ Stream.mergeBy compare s1 s2
-- [1,2,3,4,5,6,8]
-- See 'mergeByM2' for a fusible alternative.
-- /CPS/
{-# INLINE mergeBy #-}
mergeBy :: (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
mergeBy f m1 m2 = fromStreamK $ K.mergeBy f (toStreamK m1) (toStreamK m2)
-- | Like 'mergeBy' but with a monadic comparison function.
-- Merge two streams randomly:
-- @
-- > randomly _ _ = randomIO >>= \x -> return $ if x then LT else GT
-- > Stream.toList $ Stream.mergeByM randomly (Stream.fromList [1,1,1,1]) (Stream.fromList [2,2,2,2])
-- [2,1,2,2,2,1,1,1]
-- @
-- Merge two streams in a proportion of 2:1:
-- >>> :{
-- do
-- let s1 = Stream.fromList [1,1,1,1,1,1]
-- s2 = Stream.fromList [2,2,2]
-- let proportionately m n = do
-- ref <- newIORef $ cycle $ Prelude.concat [Prelude.replicate m LT, Prelude.replicate n GT]
-- return $ \_ _ -> do
-- r <- readIORef ref
-- writeIORef ref $ Prelude.tail r
-- return $ Prelude.head r
-- f <- proportionately 2 1
-- xs <- Stream.fold Fold.toList $ Stream.mergeByM f s1 s2
-- print xs
-- :}
-- [1,1,2,1,1,2,1,1,2]
-- See 'mergeByM2' for a fusible alternative.
-- /CPS/
{-# INLINE mergeByM #-}
:: Monad m
=> (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
mergeByM f m1 m2 = fromStreamK $ K.mergeByM f (toStreamK m1) (toStreamK m2)
-- | Like 'mergeByM' but much faster, works best when merging statically known
-- number of streams. When merging more than two streams try to merge pairs and
-- pair of pairs in a tree like structure.'mergeByM' works better with variable
-- number of streams being merged using 'mergeMapWith'.
-- /Internal/
{-# INLINE mergeByM2 #-}
:: Monad m
=> (a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
mergeByM2 f m1 m2 =
fromStreamD $ D.mergeByM f (toStreamD m1) (toStreamD m2)
-- | Like 'mergeByM' but stops merging as soon as any of the two streams stops.
-- /Unimplemented/
{-# INLINABLE mergeMinBy #-}
mergeMinBy :: -- Monad m =>
(a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
mergeMinBy _f _m1 _m2 = undefined
-- fromStreamD $ D.mergeMinBy f (toStreamD m1) (toStreamD m2)
-- | Like 'mergeByM' but stops merging as soon as the first stream stops.
-- /Unimplemented/
{-# INLINABLE mergeFstBy #-}
mergeFstBy :: -- Monad m =>
(a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
mergeFstBy _f _m1 _m2 = undefined
-- fromStreamK $ D.mergeFstBy f (toStreamD m1) (toStreamD m2)
-- Combine N Streams - unfoldMany
-- | Like 'concatMap' but uses an 'Unfold' for stream generation. Unlike
-- 'concatMap' this can fuse the 'Unfold' code with the inner loop and
-- therefore provide many times better performance.
{-# INLINE unfoldMany #-}
unfoldMany ::Monad m => Unfold m a b -> Stream m a -> Stream m b
unfoldMany u m = fromStreamD $ D.unfoldMany u (toStreamD m)
-- | This does not pair streams like mergeMapWith, instead, it goes through
-- each stream one by one and yields one element from each stream. After it
-- goes to the last stream it reverses the traversal to come back to the first
-- stream yielding elements from each stream on its way back to the first
-- stream and so on.
-- >>> lists = Stream.fromList [[1,1],[2,2],[3,3],[4,4],[5,5]]
-- >>> interleaved = Stream.unfoldInterleave Unfold.fromList lists
-- >>> Stream.fold Fold.toList interleaved
-- [1,2,3,4,5,5,4,3,2,1]
-- Note that this is order of magnitude more efficient than "mergeMapWith
-- wSerial" because of fusion.
-- /Fused/
{-# INLINE unfoldInterleave #-}
unfoldInterleave ::Monad m => Unfold m a b -> Stream m a -> Stream m b
unfoldInterleave u m =
fromStreamD $ D.unfoldInterleave u (toStreamD m)
-- | 'unfoldInterleave' switches to the next stream whenever a value from a
-- stream is yielded, it does not switch on a 'Skip'. So if a stream keeps
-- skipping for long time other streams won't get a chance to run.
-- 'unfoldRoundRobin' switches on Skip as well. So it basically schedules each
-- stream fairly irrespective of whether it produces a value or not.
{-# INLINE unfoldRoundRobin #-}
unfoldRoundRobin ::Monad m => Unfold m a b -> Stream m a -> Stream m b
unfoldRoundRobin u m =
fromStreamD $ D.unfoldRoundRobin u (toStreamD m)
-- Combine N Streams - interpose
-- > interpose x unf str = gintercalate unf str UF.identity (repeat x)
-- | Unfold the elements of a stream, intersperse the given element between the
-- unfolded streams and then concat them into a single stream.
-- >>> unwords = Stream.interpose ' '
-- /Pre-release/
{-# INLINE interpose #-}
interpose :: Monad m
=> c -> Unfold m b c -> Stream m b -> Stream m c
interpose x unf str =
fromStreamD $ D.interpose x unf (toStreamD str)
-- interposeSuffix x unf str = gintercalateSuffix unf str UF.identity (repeat x)
-- | Unfold the elements of a stream, append the given element after each
-- unfolded stream and then concat them into a single stream.
-- >>> unlines = Stream.interposeSuffix '\n'
-- /Pre-release/
{-# INLINE interposeSuffix #-}
interposeSuffix :: Monad m
=> c -> Unfold m b c -> Stream m b -> Stream m c
interposeSuffix x unf str =
fromStreamD $ D.interposeSuffix x unf (toStreamD str)
-- Combine N Streams - intercalate
-- XXX we can swap the order of arguments to gintercalate so that the
-- definition of unfoldMany becomes simpler? The first stream should be
-- infixed inside the second one. However, if we change the order in
-- "interleave" as well similarly, then that will make it a bit unintuitive.
-- > unfoldMany unf str =
-- > gintercalate unf str (UF.nilM (\_ -> return ())) (repeat ())
-- | 'interleaveFst' followed by unfold and concat.
-- /Pre-release/
{-# INLINE gintercalate #-}
:: Monad m
=> Unfold m a c -> Stream m a -> Unfold m b c -> Stream m b -> Stream m c
gintercalate unf1 str1 unf2 str2 =
fromStreamD $ D.gintercalate
unf1 (toStreamD str1)
unf2 (toStreamD str2)
-- > intercalate unf seed str = gintercalate unf str unf (repeatM seed)
-- | 'intersperse' followed by unfold and concat.
-- >>> intercalate u a = Stream.unfoldMany u . Stream.intersperse a
-- >>> intersperse = Stream.intercalate Unfold.identity
-- >>> unwords = Stream.intercalate Unfold.fromList " "
-- >>> input = Stream.fromList ["abc", "def", "ghi"]
-- >>> Stream.fold Fold.toList $ Stream.intercalate Unfold.fromList " " input
-- "abc def ghi"
{-# INLINE intercalate #-}
intercalate :: Monad m
=> Unfold m b c -> b -> Stream m b -> Stream m c
intercalate unf seed str = fromStreamD $
D.unfoldMany unf $ D.intersperse seed (toStreamD str)
-- | 'interleaveFstSuffix2' followed by unfold and concat.
-- /Pre-release/
{-# INLINE gintercalateSuffix #-}
:: Monad m
=> Unfold m a c -> Stream m a -> Unfold m b c -> Stream m b -> Stream m c
gintercalateSuffix unf1 str1 unf2 str2 =
fromStreamD $ D.gintercalateSuffix
unf1 (toStreamD str1)
unf2 (toStreamD str2)
-- > intercalateSuffix unf seed str = gintercalateSuffix unf str unf (repeatM seed)
-- | 'intersperseMSuffix' followed by unfold and concat.
-- >>> intercalateSuffix u a = Stream.unfoldMany u . Stream.intersperseMSuffix a
-- >>> intersperseMSuffix = Stream.intercalateSuffix Unfold.identity
-- >>> unlines = Stream.intercalateSuffix Unfold.fromList "\n"
-- >>> input = Stream.fromList ["abc", "def", "ghi"]
-- >>> Stream.fold Fold.toList $ Stream.intercalateSuffix Unfold.fromList "\n" input
-- "abc\ndef\nghi\n"
{-# INLINE intercalateSuffix #-}
intercalateSuffix :: Monad m
=> Unfold m b c -> b -> Stream m b -> Stream m c
intercalateSuffix unf seed =
fromStreamD . D.intercalateSuffix unf seed . toStreamD
-- Combine N Streams - concatMap
-- | Flatten a stream of streams to a single stream.
-- >>> concat = Stream.concatMap id
-- /Pre-release/
{-# INLINE concat #-}
concat :: Monad m => Stream m (Stream m a) -> Stream m a
concat = concatMap id
-- Combine N Streams - concatMap
-- | Like 'concatMapWith' but carries a state which can be used to share
-- information across multiple steps of concat.
-- >>> concatSmapMWith combine f initial = Stream.concatMapWith combine id . Stream.smapM f initial
-- /Pre-release/
{-# INLINE concatSmapMWith #-}
:: Monad m
=> (Stream m b -> Stream m b -> Stream m b)
-> (s -> a -> m (s, Stream m b))
-> m s
-> Stream m a
-> Stream m b
concatSmapMWith combine f initial =
concatMapWith combine id . smapM f initial
-- XXX Implement a StreamD version for fusion.
-- | Combine streams in pairs using a binary combinator, the resulting streams
-- are then combined again in pairs recursively until we get to a single
-- combined stream. The composition would thus form a binary tree.
-- For example, you can sort a stream using merge sort like this:
-- >>> s = Stream.fromList [5,1,7,9,2]
-- >>> generate = Stream.fromPure
-- >>> combine = Stream.mergeBy compare
-- >>> Stream.fold Fold.toList $ Stream.mergeMapWith combine generate s
-- [1,2,5,7,9]
-- Note that if the stream length is not a power of 2, the binary tree composed
-- by mergeMapWith would not be balanced, which may or may not be important
-- depending on what you are trying to achieve.
-- /Caution: the stream of streams must be finite/
-- /CPS/
-- /Pre-release/
{-# INLINE mergeMapWith #-}
mergeMapWith ::
(Stream m b -> Stream m b -> Stream m b)
-> (a -> Stream m b)
-> Stream m a
-> Stream m b
mergeMapWith par f m =
$ K.mergeMapWith
(\s1 s2 -> toStreamK $ fromStreamK s1 `par` fromStreamK s2)
(toStreamK . f)
(toStreamK m)
-- concatIterate - Map and flatten Trees of Streams
-- | Yield an input element in the output stream, map a stream generator on it
-- and repeat the process on the resulting stream. Resulting streams are
-- flattened using the 'concatMapWith' combinator. This can be used for a depth
-- first style (DFS) traversal of a tree like structure.
-- Example, list a directory tree using DFS:
-- >>> f = either Dir.readEitherPaths (const Stream.nil)
-- >>> input = Stream.fromPure (Left ".")
-- >>> ls = Stream.concatIterateWith Stream.append f input
-- Note that 'iterateM' is a special case of 'concatIterateWith':
-- >>> iterateM f = Stream.concatIterateWith Stream.append (Stream.fromEffect . f) . Stream.fromEffect
-- /CPS/
-- /Pre-release/
{-# INLINE concatIterateWith #-}
concatIterateWith ::
(Stream m a -> Stream m a -> Stream m a)
-> (a -> Stream m a)
-> Stream m a
-> Stream m a
concatIterateWith combine f = iterateStream
iterateStream = concatMapWith combine generate
generate x = x `cons` iterateStream (f x)
-- | Traverse the stream in depth first style (DFS). Map each element in the
-- input stream to a stream and flatten, recursively map the resulting elements
-- as well to a stream and flatten until no more streams are generated.
-- Example, list a directory tree using DFS:
-- >>> f = either (Just . Dir.readEitherPaths) (const Nothing)
-- >>> input = Stream.fromPure (Left ".")
-- >>> ls = Stream.concatIterateDfs f input
-- This is equivalent to using @concatIterateWith Stream.append@.
-- /Pre-release/
{-# INLINE concatIterateDfs #-}
concatIterateDfs :: Monad m =>
(a -> Maybe (Stream m a))
-> Stream m a
-> Stream m a
concatIterateDfs f stream =
$ D.concatIterateDfs (fmap toStreamD . f ) (toStreamD stream)
-- | Similar to 'concatIterateDfs' except that it traverses the stream in
-- breadth first style (BFS). First, all the elements in the input stream are
-- emitted, and then their traversals are emitted.
-- Example, list a directory tree using BFS:
-- >>> f = either (Just . Dir.readEitherPaths) (const Nothing)
-- >>> input = Stream.fromPure (Left ".")
-- >>> ls = Stream.concatIterateBfs f input
-- /Pre-release/
{-# INLINE concatIterateBfs #-}
concatIterateBfs :: Monad m =>
(a -> Maybe (Stream m a))
-> Stream m a
-> Stream m a
concatIterateBfs f stream =
$ D.concatIterateBfs (fmap toStreamD . f ) (toStreamD stream)
-- | Same as 'concatIterateBfs' except that the traversal of the last
-- element on a level is emitted first and then going backwards up to the first
-- element (reversed ordering). This may be slightly faster than
-- 'concatIterateBfs'.
{-# INLINE concatIterateBfsRev #-}
concatIterateBfsRev :: Monad m =>
(a -> Maybe (Stream m a))
-> Stream m a
-> Stream m a
concatIterateBfsRev f stream =
$ D.concatIterateBfsRev (fmap toStreamD . f ) (toStreamD stream)
-- | Like 'concatIterateWith' but uses the pairwise flattening combinator
-- 'mergeMapWith' for flattening the resulting streams. This can be used for a
-- balanced traversal of a tree like structure.
-- Example, list a directory tree using balanced traversal:
-- >>> f = either Dir.readEitherPaths (const Stream.nil)
-- >>> input = Stream.fromPure (Left ".")
-- >>> ls = Stream.mergeIterateWith Stream.interleave f input
-- /CPS/
-- /Pre-release/
{-# INLINE mergeIterateWith #-}
mergeIterateWith ::
(Stream m a -> Stream m a -> Stream m a)
-> (a -> Stream m a)
-> Stream m a
-> Stream m a
mergeIterateWith combine f = iterateStream
iterateStream = mergeMapWith combine generate
generate x = x `cons` iterateStream (f x)
-- | Same as @concatIterateDfs@ but more efficient due to stream fusion.
-- Example, list a directory tree using DFS:
-- >>> f = Unfold.either Dir.eitherReaderPaths Unfold.nil
-- >>> input = Stream.fromPure (Left ".")
-- >>> ls = Stream.unfoldIterateDfs f input
-- /Pre-release/
{-# INLINE unfoldIterateDfs #-}
unfoldIterateDfs :: Monad m => Unfold m a a -> Stream m a -> Stream m a
unfoldIterateDfs u = fromStreamD . D.unfoldIterateDfs u . toStreamD
-- | Like 'unfoldIterateDfs' but uses breadth first style traversal.
-- /Pre-release/
{-# INLINE unfoldIterateBfs #-}
unfoldIterateBfs :: Monad m => Unfold m a a -> Stream m a -> Stream m a
unfoldIterateBfs u = fromStreamD . D.unfoldIterateBfs u . toStreamD
-- | Like 'unfoldIterateBfs' but processes the children in reverse order,
-- therefore, may be slightly faster.
-- /Pre-release/
{-# INLINE unfoldIterateBfsRev #-}
unfoldIterateBfsRev :: Monad m => Unfold m a a -> Stream m a -> Stream m a
unfoldIterateBfsRev u =
fromStreamD . D.unfoldIterateBfsRev u . toStreamD
-- Flattening Graphs
-- To traverse graphs we need a state to be carried around in the traversal.
-- For example, we can use a hashmap to store the visited status of nodes.
-- | Like 'iterateMap' but carries a state in the stream generation function.
-- This can be used to traverse graph like structures, we can remember the
-- visited nodes in the state to avoid cycles.
-- Note that a combination of 'iterateMap' and 'usingState' can also be used to
-- traverse graphs. However, this function provides a more localized state
-- instead of using a global state.
-- See also: 'mfix'
-- /Pre-release/
{-# INLINE concatIterateScanWith #-}
:: Monad m
=> (Stream m a -> Stream m a -> Stream m a)
-> (b -> a -> m (b, Stream m a))
-> m b
-> Stream m a
-> Stream m a
concatIterateScanWith combine f initial stream =
concatEffect $ do
b <- initial
iterateStream (b, stream)
iterateStream (b, s) = pure $ concatMapWith combine (generate b) s
generate b a = a `cons` feedback b a
feedback b a = concatEffect $ f b a >>= iterateStream
-- Next stream is to be generated by the return value of the previous stream. A
-- general intuitive way of doing that could be to use an appending monad
-- instance for streams where the result of the previous stream is used to
-- generate the next one. In the first pass we can just emit the values in the
-- stream and keep building a buffered list/stream, once done we can then
-- process the buffered stream.
{-# INLINE concatIterateScan #-}
:: Monad m
=> (b -> a -> m b)
-> (b -> m (Maybe (b, Stream m a)))
-> b
-> Stream m a
concatIterateScan scanner generate initial =
$ D.concatIterateScan
scanner (fmap (fmap (fmap toStreamD)) . generate) initial
-- Either streams
-- Keep concating either streams as long as rights are generated, stop as soon
-- as a left is generated and concat the left stream.
-- See also: 'handle'
-- /Unimplemented/
:: (forall x. t m x -> t m x -> t m x)
-> (a -> t m (Either (Stream m b) b))
-> Stream m a
-> Stream m b
concatMapEitherWith = undefined
-- XXX We should prefer using the Maybe stream returning signatures over this.
-- This API should perhaps be removed in favor of those.
-- | In an 'Either' stream iterate on 'Left's. This is a special case of
-- 'concatIterateWith':
-- >>> concatIterateLeftsWith combine f = Stream.concatIterateWith combine (either f (const Stream.nil))
-- To traverse a directory tree:
-- >>> input = Stream.fromPure (Left ".")
-- >>> ls = Stream.concatIterateLeftsWith Stream.append Dir.readEither input
-- /Pre-release/
{-# INLINE concatIterateLeftsWith #-}
:: (b ~ Either a c)
=> (Stream m b -> Stream m b -> Stream m b)
-> (a -> Stream m b)
-> Stream m b
-> Stream m b
concatIterateLeftsWith combine f =
concatIterateWith combine (either f (const nil))

{-# OPTIONS_GHC -Wno-orphans #-}
-- |
-- Module : Streamly.Internal.Data.Stream.Generate
-- Copyright : (c) 2017 Composewell Technologies
-- License : BSD-3-Clause
-- Maintainer :
-- Stability : experimental
-- Portability : GHC
module Streamly.Internal.Data.Stream.Generate
-- * Primitives
, Stream.nilM
, Stream.cons
, Stream.consM
-- * From 'Unfold'
, unfold
-- * Unfolding
, unfoldr
, unfoldrM
-- * From Values
, Stream.fromPure
, Stream.fromEffect
, repeat
, repeatM
, replicate
, replicateM
-- * Enumeration
, Enumerable (..)
, enumerate
, enumerateTo
-- * Time Enumeration
, times
, timesWith
, absTimes
, absTimesWith
, relTimes
, relTimesWith
, durations
, timeout
-- * Iteration
, iterate
, iterateM
-- * Cyclic Elements
, mfix
-- * From Containers
, Bottom.fromList
, fromFoldable
-- * From memory
, fromPtr
, fromPtrN
, fromByteStr#
-- , fromByteArray#
, fromUnboxedIORef
#include "inline.hs"
import Control.Monad.IO.Class (MonadIO)
import Data.Word (Word8)
import Foreign.Storable (Storable)
import GHC.Exts (Addr#, Ptr (Ptr))
import Streamly.Internal.Data.Stream.Bottom
(absTimesWith, relTimesWith, timesWith)
import Streamly.Internal.Data.Stream.Enumerate
(Enumerable(..), enumerate, enumerateTo)
import Streamly.Internal.Data.Stream.Type
(Stream, fromStreamD, fromStreamK, toStreamK)
import Streamly.Internal.Data.Time.Units (AbsTime, RelTime64, addToAbsTime64)
import Streamly.Internal.Data.Unbox (Unbox)
import Streamly.Internal.Data.Unfold.Type (Unfold)
import qualified Streamly.Internal.Data.IORef.Unboxed as Unboxed
import qualified Streamly.Internal.Data.Stream.Bottom as Bottom
import qualified Streamly.Internal.Data.Stream as D
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
import qualified Streamly.Internal.Data.Stream.Type as Stream
import qualified Streamly.Internal.Data.Stream.Transform as Stream (sequence)
import Prelude hiding (iterate, replicate, repeat, take)
-- $setup
-- >>> :m
-- >>> import Control.Concurrent (threadDelay)
-- >>> import Data.Function (fix, (&))
-- >>> import Data.Semigroup (cycle1)
-- >>> import Streamly.Internal.Data.Stream.Cross (CrossStream(..))
-- >>> import qualified Streamly.Data.Fold as Fold
-- >>> import qualified Streamly.Data.Unfold as Unfold
-- >>> import qualified Streamly.Internal.Data.Stream as Stream
-- >>> import GHC.Exts (Ptr (Ptr))
-- From Unfold
-- | Convert an 'Unfold' into a stream by supplying it an input seed.
-- >>> s = Stream.unfold Unfold.replicateM (3, putStrLn "hello")
-- >>> Stream.fold Fold.drain s
-- hello
-- hello
-- hello
{-# INLINE unfold #-}
unfold :: Monad m => Unfold m a b -> a -> Stream m b
unfold unf = Stream.fromStreamD . D.unfold unf
-- Generation by Unfolding
-- |
-- >>> :{
-- unfoldr step s =
-- case step s of
-- Nothing -> Stream.nil
-- Just (a, b) -> a `Stream.cons` unfoldr step b
-- :}
-- Build a stream by unfolding a /pure/ step function @step@ starting from a
-- seed @s@. The step function returns the next element in the stream and the
-- next seed value. When it is done it returns 'Nothing' and the stream ends.
-- For example,
-- >>> :{
-- let f b =
-- if b > 2
-- then Nothing
-- else Just (b, b + 1)
-- in Stream.fold Fold.toList $ Stream.unfoldr f 0
-- :}
-- [0,1,2]
{-# INLINE_EARLY unfoldr #-}
unfoldr :: Monad m => (b -> Maybe (a, b)) -> b -> Stream m a
unfoldr step seed = fromStreamD (D.unfoldr step seed)
{-# RULES "unfoldr fallback to StreamK" [1]
forall a b. D.toStreamK (D.unfoldr a b) = K.unfoldr a b #-}
-- | Build a stream by unfolding a /monadic/ step function starting from a
-- seed. The step function returns the next element in the stream and the next
-- seed value. When it is done it returns 'Nothing' and the stream ends. For
-- example,
-- >>> :{
-- let f b =
-- if b > 2
-- then return Nothing
-- else return (Just (b, b + 1))
-- in Stream.fold Fold.toList $ Stream.unfoldrM f 0
-- :}
-- [0,1,2]
{-# INLINE unfoldrM #-}
unfoldrM :: Monad m => (b -> m (Maybe (a, b))) -> b -> Stream m a
unfoldrM step = fromStreamD . D.unfoldrM step
-- From Values
-- |
-- Generate an infinite stream by repeating a pure value.
{-# INLINE_NORMAL repeat #-}
repeat :: Monad m => a -> Stream m a
repeat = fromStreamD . D.repeat
-- |
-- >>> repeatM = Stream.sequence . Stream.repeat
-- >>> repeatM = fix . Stream.consM
-- >>> repeatM = cycle1 . Stream.fromEffect
-- Generate a stream by repeatedly executing a monadic action forever.
-- >>> :{
-- repeatAction =
-- Stream.repeatM (threadDelay 1000000 >> print 1)
-- & Stream.take 10
-- & Stream.fold Fold.drain
-- :}
{-# INLINE_NORMAL repeatM #-}
repeatM :: Monad m => m a -> Stream m a
repeatM = Stream.sequence . repeat
-- |
-- >>> replicate n = Stream.take n . Stream.repeat
-- Generate a stream of length @n@ by repeating a value @n@ times.
{-# INLINE_NORMAL replicate #-}
replicate :: Monad m => Int -> a -> Stream m a
replicate n = fromStreamD . D.replicate n
-- |
-- >>> replicateM n = Stream.sequence . Stream.replicate n
-- Generate a stream by performing a monadic action @n@ times.
{-# INLINE_NORMAL replicateM #-}
replicateM :: Monad m => Int -> m a -> Stream m a
replicateM n = Stream.sequence . replicate n
-- Time Enumeration
-- | @times@ returns a stream of time value tuples with clock of 10 ms
-- granularity. The first component of the tuple is an absolute time reference
-- (epoch) denoting the start of the stream and the second component is a time
-- relative to the reference.
-- >>> f = Fold.drainMapM (\x -> print x >> threadDelay 1000000)
-- >>> Stream.fold f $ Stream.take 3 $ Stream.times
-- (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
-- (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
-- (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
-- Note: This API is not safe on 32-bit machines.
-- /Pre-release/
{-# INLINE times #-}
times :: MonadIO m => Stream m (AbsTime, RelTime64)
times = timesWith 0.01
-- | @absTimes@ returns a stream of absolute timestamps using a clock of 10 ms
-- granularity.
-- >>> f = Fold.drainMapM print
-- >>> Stream.fold f $ Stream.delayPre 1 $ Stream.take 3 $ Stream.absTimes
-- AbsTime (TimeSpec {sec = ..., nsec = ...})
-- AbsTime (TimeSpec {sec = ..., nsec = ...})
-- AbsTime (TimeSpec {sec = ..., nsec = ...})
-- Note: This API is not safe on 32-bit machines.
-- /Pre-release/
{-# INLINE absTimes #-}
absTimes :: MonadIO m => Stream m AbsTime
absTimes = fmap (uncurry addToAbsTime64) times
-- | @relTimes@ returns a stream of relative time values starting from 0,
-- using a clock of granularity 10 ms.
-- >>> f = Fold.drainMapM print
-- >>> Stream.fold f $ Stream.delayPre 1 $ Stream.take 3 $ Stream.relTimes
-- RelTime64 (NanoSecond64 ...)
-- RelTime64 (NanoSecond64 ...)
-- RelTime64 (NanoSecond64 ...)
-- Note: This API is not safe on 32-bit machines.
-- /Pre-release/
{-# INLINE relTimes #-}
relTimes :: MonadIO m => Stream m RelTime64
relTimes = fmap snd times
-- | @durations g@ returns a stream of relative time values measuring the time
-- elapsed since the immediate predecessor element of the stream was generated.
-- The first element of the stream is always 0. @durations@ uses a clock of
-- granularity @g@ specified in seconds. A low granularity clock is more
-- expensive in terms of CPU usage. The minimum granularity is 1 millisecond.
-- Durations lower than 1 ms will be 0.
-- Note: This API is not safe on 32-bit machines.
-- /Unimplemented/
{-# INLINE durations #-}
durations :: -- Monad m =>
Double -> t m RelTime64
durations = undefined
-- | Generate a singleton event at or after the specified absolute time. Note
-- that this is different from a threadDelay, a threadDelay starts from the
-- time when the action is evaluated, whereas if we use AbsTime based timeout
-- it will immediately expire if the action is evaluated too late.
-- /Unimplemented/
{-# INLINE timeout #-}
timeout :: -- Monad m =>
AbsTime -> t m ()
timeout = undefined
-- Iterating functions
-- |
-- >>> iterate f x = x `Stream.cons` iterate f x
-- Generate an infinite stream with @x@ as the first element and each
-- successive element derived by applying the function @f@ on the previous
-- element.
-- >>> Stream.fold Fold.toList $ Stream.take 5 $ Stream.iterate (+1) 1
-- [1,2,3,4,5]
{-# INLINE_NORMAL iterate #-}
iterate :: Monad m => (a -> a) -> a -> Stream m a
iterate step = fromStreamD . D.iterate step
-- |
-- >>> iterateM f m = m >>= \a -> return a `Stream.consM` iterateM f (f a)
-- Generate an infinite stream with the first element generated by the action
-- @m@ and each successive element derived by applying the monadic function
-- @f@ on the previous element.
-- >>> :{
-- Stream.iterateM (\x -> print x >> return (x + 1)) (return 0)
-- & Stream.take 3
-- & Stream.fold Fold.toList
-- :}
-- 0
-- 1
-- [0,1,2]
{-# INLINE iterateM #-}
iterateM :: Monad m => (a -> m a) -> m a -> Stream m a
iterateM step = fromStreamD . D.iterateM step
-- | We can define cyclic structures using @let@:
-- >>> let (a, b) = ([1, b], head a) in (a, b)
-- ([1,1],1)
-- The function @fix@ defined as:
-- >>> fix f = let x = f x in x
-- ensures that the argument of a function and its output refer to the same
-- lazy value @x@ i.e. the same location in memory. Thus @x@ can be defined
-- in terms of itself, creating structures with cyclic references.
-- >>> f ~(a, b) = ([1, b], head a)
-- >>> fix f
-- ([1,1],1)
-- 'Control.Monad.mfix' is essentially the same as @fix@ but for monadic
-- values.
-- Using 'mfix' for streams we can construct a stream in which each element of
-- the stream is defined in a cyclic fashion. The argument of the function
-- being fixed represents the current element of the stream which is being
-- returned by the stream monad. Thus, we can use the argument to construct
-- itself.
-- In the following example, the argument @action@ of the function @f@
-- represents the tuple @(x,y)@ returned by it in a given iteration. We define
-- the first element of the tuple in terms of the second.
-- >>> import Streamly.Internal.Data.Stream as Stream
-- >>> import System.IO.Unsafe (unsafeInterleaveIO)
-- >>> :{
-- main = Stream.fold (Fold.drainMapM print) $ Stream.mfix f
-- where
-- f action = unCrossStream $ do
-- let incr n act = fmap ((+n) . snd) $ unsafeInterleaveIO act
-- x <- CrossStream (Stream.sequence $ Stream.fromList [incr 1 action, incr 2 action])
-- y <- CrossStream (Stream.fromList [4,5])
-- return (x, y)
-- :}
-- Note: you cannot achieve this by just changing the order of the monad
-- statements because that would change the order in which the stream elements
-- are generated.
-- Note that the function @f@ must be lazy in its argument, that's why we use
-- 'unsafeInterleaveIO' on @action@ because IO monad is strict.
-- /CPS/
-- /Pre-release/
{-# INLINE mfix #-}
mfix :: Monad m => (m a -> Stream m a) -> Stream m a
mfix f = fromStreamK $ K.mfix (toStreamK . f)
-- Conversions
-- |
-- >>> fromFoldable = Prelude.foldr Stream.cons Stream.nil
-- Construct a stream from a 'Foldable' containing pure values:
-- /CPS/
{-# INLINE fromFoldable #-}
fromFoldable :: Foldable f => f a -> Stream m a
fromFoldable = fromStreamK . K.fromFoldable
-- From pointers
-- | Keep reading 'Storable' elements from 'Ptr' onwards.
-- /Unsafe:/ The caller is responsible for safe addressing.
-- /Pre-release/
{-# INLINE fromPtr #-}
fromPtr :: (MonadIO m, Storable a) => Ptr a -> Stream m a
fromPtr = Stream.fromStreamD . D.fromPtr
-- | Take @n@ 'Storable' elements starting from 'Ptr' onwards.
-- >>> fromPtrN n = Stream.take n . Stream.fromPtr
-- /Unsafe:/ The caller is responsible for safe addressing.
-- /Pre-release/
{-# INLINE fromPtrN #-}
fromPtrN :: (MonadIO m, Storable a) => Int -> Ptr a -> Stream m a
fromPtrN n = Stream.fromStreamD . D.take n . D.fromPtr
-- | Read bytes from an 'Addr#' until a 0 byte is encountered, the 0 byte is
-- not included in the stream.
-- >>> fromByteStr# addr = Stream.takeWhile (/= 0) $ Stream.fromPtr $ Ptr addr
-- /Unsafe:/ The caller is responsible for safe addressing.
-- Note that this is completely safe when reading from Haskell string
-- literals because they are guaranteed to be NULL terminated:
-- >>> Stream.fold Fold.toList $ Stream.fromByteStr# "\1\2\3\0"#
-- [1,2,3]
{-# INLINE fromByteStr# #-}
fromByteStr# :: MonadIO m => Addr# -> Stream m Word8
fromByteStr# addr =
Stream.fromStreamD $ D.takeWhile (/= 0) $ D.fromPtr $ Ptr addr
-- | Construct a stream by reading an 'Unboxed' 'IORef' repeatedly.
-- /Pre-release/
{-# INLINE fromUnboxedIORef #-}
fromUnboxedIORef :: (MonadIO m, Unbox a) => Unboxed.IORef a -> Stream m a
fromUnboxedIORef = fromStreamD . Unboxed.toStreamD

View File

@ -1,95 +0,0 @@
-- |
-- Module : Streamly.Internal.Data.Stream.Lift
-- Copyright : (c) 2019 Composewell Technologies
-- License : BSD-3-Clause
-- Maintainer :
-- Stability : experimental
-- Portability : GHC
module Streamly.Internal.Data.Stream.Lift
-- * Generalize Inner Monad
, generalizeInner
-- * Transform Inner Monad
, liftInnerWith
, runInnerWith
, runInnerWithState
import Data.Functor.Identity (Identity (..))
import Streamly.Internal.Data.Stream.Type
(Stream, fromStreamD, toStreamD, fromStreamK, toStreamK)
import qualified Streamly.Internal.Data.Stream as D
import qualified Streamly.Internal.Data.StreamK as K
-- $setup
-- >>> :m
-- >>> import Data.Functor.Identity (runIdentity)
-- >>> import Streamly.Internal.Data.Stream as Stream
-- Generalize the underlying monad
-- | Transform the inner monad of a stream using a natural transformation.
-- Example, generalize the inner monad from Identity to any other:
-- >>> generalizeInner = Stream.morphInner (return . runIdentity)
-- Also known as hoist.
-- /CPS/
{-# INLINE morphInner #-}
morphInner :: (Monad m, Monad n)
=> (forall x. m x -> n x) -> Stream m a -> Stream n a
morphInner f xs = fromStreamK $ K.hoist f (toStreamK xs)
-- | Generalize the inner monad of the stream from 'Identity' to any monad.
-- Definition:
-- >>> generalizeInner = Stream.morphInner (return . runIdentity)
-- /CPS/
{-# INLINE generalizeInner #-}
generalizeInner :: Monad m => Stream Identity a -> Stream m a
generalizeInner = morphInner (return . runIdentity)
-- fromStreamK $ K.hoist (return . runIdentity) (toStreamK xs)
-- Add and remove a monad transformer
-- | Lift the inner monad @m@ of a stream @Stream m a@ to @t m@ using the
-- supplied lift function.
{-# INLINE liftInnerWith #-}
liftInnerWith :: (Monad m, Monad (t m))
=> (forall b. m b -> t m b) -> Stream m a -> Stream (t m) a
liftInnerWith lift xs = fromStreamD $ D.liftInnerWith lift (toStreamD xs)
-- | Evaluate the inner monad of a stream using the supplied runner function.
{-# INLINE runInnerWith #-}
runInnerWith :: (Monad m, Applicative (t m)) =>
(forall b. t m b -> m b) -> Stream (t m) a -> Stream m a
runInnerWith run xs = fromStreamD $ D.runInnerWith run (toStreamD xs)
-- | Evaluate the inner monad of a stream using the supplied stateful runner
-- function and the initial state. The state returned by an invocation of the
-- runner is supplied as input state to the next invocation.
{-# INLINE runInnerWithState #-}
runInnerWithState :: (Monad m, Applicative (t m)) =>
(forall b. s -> t m b -> m (b, s))
-> m s
-> Stream (t m) a
-> Stream m (s, a)
runInnerWithState run initial xs =
fromStreamD $ D.runInnerWithState run initial (toStreamD xs)

View File

@ -1,444 +0,0 @@
-- |
-- Module : Streamly.Internal.Data.Stream.Reduce
-- Copyright : (c) 2017 Composewell Technologies
-- License : BSD-3-Clause
-- Maintainer :
-- Stability : experimental
-- Portability : GHC
-- Reduce streams by streams, folds or parsers.
module Streamly.Internal.Data.Stream.Reduce
-- * Reduce By Streams
, dropInfix
, dropSuffix
-- * Reduce By Folds
-- |
-- Reduce a stream by folding or parsing chunks of the stream. Functions
-- generally ending in these shapes:
-- @
-- f (Fold m a b) -> Stream m a -> Stream m b
-- f (Parser a m b) -> Stream m a -> Stream m b
-- @
-- ** Generic Folding
-- | Apply folds on a stream.
, foldMany
, foldManyPost
, refoldMany
, foldSequence
, foldIterateM
, refoldIterateM
, reduceIterateBfs
-- ** Chunking
-- | Element unaware grouping.
, chunksOf
-- ** Splitting
-- XXX Implement these as folds or parsers instead.
, splitOnSuffixSeqAny
, splitOnPrefix
, splitOnAny
-- * Reduce By Parsers
-- ** Generic Parsing
-- | Apply parsers on a stream.
, parseMany
, parseManyD
, parseManyTill
, parseSequence
, parseIterate
import Control.Monad.IO.Class (MonadIO(..))
import Streamly.Internal.Data.Array.Type (Array)
import Streamly.Internal.Data.Fold.Type (Fold (..))
import Streamly.Internal.Data.Parser (Parser (..))
import Streamly.Internal.Data.Parser (ParseError)
import Streamly.Internal.Data.Refold.Type (Refold (..))
import Streamly.Internal.Data.Stream.Bottom (foldManyPost)
import Streamly.Internal.Data.Stream.Type (Stream, fromStreamD, toStreamD)
import Streamly.Internal.Data.Unbox (Unbox)
import qualified Streamly.Internal.Data.Array.Type as Array
import qualified Streamly.Internal.Data.Parser as ParserD
import qualified Streamly.Internal.Data.Stream as D
import Prelude hiding (concatMap, map)
-- $setup
-- >>> :m
-- >>> import Prelude hiding (zipWith, concatMap, concat)
-- >>> import Streamly.Internal.Data.Stream as Stream
-- >>> import qualified Streamly.Data.Fold as Fold
-- >>> import qualified Streamly.Internal.Data.Fold as Fold
-- >>> import qualified Streamly.Internal.Data.Unfold as Unfold
-- >>> import qualified Streamly.Internal.Data.Parser as Parser
-- >>> import qualified Streamly.Data.Array as Array
-- Trimming
-- | Drop prefix from the input stream if present.
-- Space: @O(1)@
-- /Unimplemented/
{-# INLINE dropPrefix #-}
dropPrefix ::
-- (Monad m, Eq a) =>
Stream m a -> Stream m a -> Stream m a
dropPrefix = error "Not implemented yet!"
-- | Drop all matching infix from the input stream if present. Infix stream
-- may be consumed multiple times.
-- Space: @O(n)@ where n is the length of the infix.
-- /Unimplemented/
{-# INLINE dropInfix #-}
dropInfix ::
-- (Monad m, Eq a) =>
Stream m a -> Stream m a -> Stream m a
dropInfix = error "Not implemented yet!"
-- | Drop suffix from the input stream if present. Suffix stream may be
-- consumed multiple times.
-- Space: @O(n)@ where n is the length of the suffix.
-- /Unimplemented/
{-# INLINE dropSuffix #-}
dropSuffix ::
-- (Monad m, Eq a) =>
Stream m a -> Stream m a -> Stream m a
dropSuffix = error "Not implemented yet!"
-- Folding
-- | Apply a 'Fold' repeatedly on a stream and emit the results in the
-- output stream. Unlike 'foldManyPost' it evaluates the fold after the stream,
-- therefore, an empty input stream results in an empty output stream.
-- Definition:
-- >>> foldMany f = Stream.parseMany (Parser.fromFold f)
-- Example, empty stream:
-- >>> f = Fold.take 2 Fold.sum
-- >>> fmany = Stream.fold Fold.toList . Stream.foldMany f
-- >>> fmany $ Stream.fromList []
-- []
-- Example, last fold empty:
-- >>> fmany $ Stream.fromList [1..4]
-- [3,7]
-- Example, last fold non-empty:
-- >>> fmany $ Stream.fromList [1..5]
-- [3,7,5]
-- Note that using a closed fold e.g. @Fold.take 0@, would result in an
-- infinite stream on a non-empty input stream.
{-# INLINE foldMany #-}
:: Monad m
=> Fold m a b
-> Stream m a
-> Stream m b
foldMany f m = fromStreamD $ D.foldMany f (toStreamD m)
-- | Like 'foldMany' but using the 'Refold' type instead of 'Fold'.
-- /Pre-release/
{-# INLINE refoldMany #-}
refoldMany :: Monad m =>
Refold m c a b -> m c -> Stream m a -> Stream m b
refoldMany f action = fromStreamD . D.refoldMany f action . toStreamD
-- | Apply a stream of folds to an input stream and emit the results in the
-- output stream.
-- /Unimplemented/
{-# INLINE foldSequence #-}
:: -- Monad m =>
Stream m (Fold m a b)
-> Stream m a
-> Stream m b
foldSequence _f _m = undefined
-- | Iterate a fold generator on a stream. The initial value @b@ is used to
-- generate the first fold, the fold is applied on the stream and the result of
-- the fold is used to generate the next fold and so on.
-- >>> import Data.Monoid (Sum(..))
-- >>> f x = return (Fold.take 2 (Fold.sconcat x))
-- >>> s = fmap Sum $ Stream.fromList [1..10]
-- >>> Stream.fold Fold.toList $ fmap getSum $ Stream.foldIterateM f (pure 0) s
-- [3,10,21,36,55,55]
-- This is the streaming equivalent of monad like sequenced application of
-- folds where next fold is dependent on the previous fold.
-- /Pre-release/
{-# INLINE foldIterateM #-}
foldIterateM ::
Monad m => (b -> m (Fold m a b)) -> m b -> Stream m a -> Stream m b
foldIterateM f i m = fromStreamD $ D.foldIterateM f i (toStreamD m)
-- | Like 'foldIterateM' but using the 'Refold' type instead. This could be
-- much more efficient due to stream fusion.
-- /Internal/
{-# INLINE refoldIterateM #-}
refoldIterateM :: Monad m =>
Refold m b a b -> m b -> Stream m a -> Stream m b
refoldIterateM c i m = fromStreamD $ D.refoldIterateM c i (toStreamD m)
-- | Binary BFS style reduce, folds a level entirely using the supplied fold
-- function, collecting the outputs as next level of the tree, then repeats the
-- same process on the next level. The last elements of a previously folded
-- level are folded first.
{-# INLINE reduceIterateBfs #-}
reduceIterateBfs :: Monad m =>
(a -> a -> m a) -> Stream m a -> m (Maybe a)
reduceIterateBfs f stream = D.reduceIterateBfs f (toStreamD stream)
-- Splitting
-- Implement this as a fold or a parser instead.
-- This can be implemented easily using Rabin Karp
-- | Split post any one of the given patterns.
-- /Unimplemented/
{-# INLINE splitOnSuffixSeqAny #-}
splitOnSuffixSeqAny :: -- (Monad m, Unboxed a, Integral a) =>
[Array a] -> Fold m a b -> Stream m a -> Stream m b
splitOnSuffixSeqAny _subseq _f _m = undefined
-- D.fromStreamD $ D.splitPostAny f subseq (D.toStreamD m)
-- | Split on a prefixed separator element, dropping the separator. The
-- supplied 'Fold' is applied on the split segments.
-- @
-- > splitOnPrefix' p xs = Stream.toList $ Stream.splitOnPrefix p (Fold.toList) (Stream.fromList xs)
-- > splitOnPrefix' (== '.') ".a.b"
-- ["a","b"]
-- @
-- An empty stream results in an empty output stream:
-- @
-- > splitOnPrefix' (== '.') ""
-- []
-- @
-- An empty segment consisting of only a prefix is folded to the default output
-- of the fold:
-- @
-- > splitOnPrefix' (== '.') "."
-- [""]
-- > splitOnPrefix' (== '.') ".a.b."
-- ["a","b",""]
-- > splitOnPrefix' (== '.') ".a..b"
-- ["a","","b"]
-- @
-- A prefix is optional at the beginning of the stream:
-- @
-- > splitOnPrefix' (== '.') "a"
-- ["a"]
-- > splitOnPrefix' (== '.') "a.b"
-- ["a","b"]
-- @
-- 'splitOnPrefix' is an inverse of 'intercalatePrefix' with a single element:
-- > Stream.intercalatePrefix (Stream.fromPure '.') Unfold.fromList . Stream.splitOnPrefix (== '.') Fold.toList === id
-- Assuming the input stream does not contain the separator:
-- > Stream.splitOnPrefix (== '.') Fold.toList . Stream.intercalatePrefix (Stream.fromPure '.') Unfold.fromList === id
-- /Unimplemented/
{-# INLINE splitOnPrefix #-}
splitOnPrefix :: -- (IsStream t, MonadCatch m) =>
(a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
splitOnPrefix _predicate _f = undefined
-- parseMany (Parser.sliceBeginBy predicate f)
-- Int list examples for splitOn:
-- >>> splitList [] [1,2,3,3,4]
-- > [[1],[2],[3],[3],[4]]
-- >>> splitList [5] [1,2,3,3,4]
-- > [[1,2,3,3,4]]
-- >>> splitList [1] [1,2,3,3,4]
-- > [[],[2,3,3,4]]
-- >>> splitList [4] [1,2,3,3,4]
-- > [[1,2,3,3],[]]
-- >>> splitList [2] [1,2,3,3,4]
-- > [[1],[3,3,4]]
-- >>> splitList [3] [1,2,3,3,4]
-- > [[1,2],[],[4]]
-- >>> splitList [3,3] [1,2,3,3,4]
-- > [[1,2],[4]]
-- >>> splitList [1,2,3,3,4] [1,2,3,3,4]
-- > [[],[]]
-- This can be implemented easily using Rabin Karp
-- | Split on any one of the given patterns.
-- /Unimplemented/
{-# INLINE splitOnAny #-}
splitOnAny :: -- (Monad m, Unboxed a, Integral a) =>
[Array a] -> Fold m a b -> Stream m a -> Stream m b
splitOnAny _subseq _f _m =
undefined -- D.fromStreamD $ D.splitOnAny f subseq (D.toStreamD m)
-- Parsing
-- | Apply a 'Parser' repeatedly on a stream and emit the parsed values in the
-- output stream.
-- Example:
-- >>> s = Stream.fromList [1..10]
-- >>> parser = Parser.takeBetween 0 2 Fold.sum
-- >>> Stream.fold Fold.toList $ Stream.parseMany parser s
-- [Right 3,Right 7,Right 11,Right 15,Right 19]
-- This is the streaming equivalent of the 'Streamly.Data.Parser.many' parse
-- combinator.
-- Known Issues: When the parser fails there is no way to get the remaining
-- stream.
{-# INLINE parseMany #-}
:: Monad m
=> Parser a m b
-> Stream m a
-> Stream m (Either ParseError b)
parseMany p m =
fromStreamD $ D.parseManyD p (toStreamD m)
-- | Same as parseMany but for StreamD streams.
-- /Internal/
{-# INLINE parseManyD #-}
:: Monad m
=> ParserD.Parser a m b
-> Stream m a
-> Stream m (Either ParseError b)
parseManyD p m =
fromStreamD $ D.parseManyD p (toStreamD m)
-- | Apply a stream of parsers to an input stream and emit the results in the
-- output stream.
-- /Unimplemented/
{-# INLINE parseSequence #-}
:: -- Monad m =>
Stream m (Parser a m b)
-> Stream m a
-> Stream m b
parseSequence _f _m = undefined
-- XXX Change the parser arguments' order
-- | @parseManyTill collect test stream@ tries the parser @test@ on the input,
-- if @test@ fails it backtracks and tries @collect@, after @collect@ succeeds
-- @test@ is tried again and so on. The parser stops when @test@ succeeds. The
-- output of @test@ is discarded and the output of @collect@ is emitted in the
-- output stream. The parser fails if @collect@ fails.
-- /Unimplemented/
{-# INLINE parseManyTill #-}
parseManyTill ::
-- MonadThrow m =>
Parser a m b
-> Parser a m x
-> t m a
-> t m b
parseManyTill = undefined
-- | Iterate a parser generating function on a stream. The initial value @b@ is
-- used to generate the first parser, the parser is applied on the stream and
-- the result is used to generate the next parser and so on.
-- >>> import Data.Monoid (Sum(..))
-- >>> s = Stream.fromList [1..10]
-- >>> Stream.fold Fold.toList $ fmap getSum $ Stream.catRights $ Stream.parseIterate (\b -> Parser.takeBetween 0 2 (Fold.sconcat b)) (Sum 0) $ fmap Sum s
-- [3,10,21,36,55,55]
-- This is the streaming equivalent of monad like sequenced application of
-- parsers where next parser is dependent on the previous parser.
-- /Pre-release/
{-# INLINE parseIterate #-}
:: Monad m
=> (b -> Parser a m b)
-> b
-> Stream m a
-> Stream m (Either ParseError b)
parseIterate f i m = fromStreamD $
D.parseIterateD f i (toStreamD m)
-- Chunking
-- | @chunksOf n stream@ groups the elements in the input stream into arrays of
-- @n@ elements each.
-- Same as the following but may be more efficient:
-- >>> chunksOf n = Stream.foldMany (Array.writeN n)
-- /Pre-release/
{-# INLINE chunksOf #-}
chunksOf :: (MonadIO m, Unbox a)
=> Int -> Stream m a -> Stream m (Array a)
chunksOf n = fromStreamD . Array.chunksOf n . toStreamD

-- stripSuffix on that especially if the elements have a Storable or Prim
-- instance.
-- See also "Streamly.Internal.Data.Stream.Reduce.dropSuffix".
-- See also "Streamly.Internal.Data.Stream.StreamD.Reduce.dropSuffix".
-- Space: @O(n)@, buffers the entire input stream as well as the suffix

View File

@ -1,52 +0,0 @@
-- |
-- Module : Streamly.Internal.Data.Stream.StreamDK
-- Copyright : (c) 2019 Composewell Technologies
-- License : BSD-3-Clause
-- Maintainer :
-- Stability : experimental
-- Portability : GHC
-- This module has the following problems due to rewrite rules:
-- * Rewrite rules lead to optimization problems, blocking fusion in some
-- cases, specifically when combining multiple operations e.g. (filter . drop).
-- * Rewrite rules lead to problems when calling a function recursively. For
-- example, the StreamD version of foldBreak cannot be used recursively when
-- wrapped in rewrite rules because each recursive call adds a roundtrip
-- conversion from D to K and back to D. We can use the StreamK versions of
-- these though because the rewrite rule gets eliminated in that case.
-- * If we have a unified module, we need two different versions of several
-- operations e.g. appendK and appendD, both are useful in different cases.
module Streamly.Internal.Data.Stream.StreamDK
( module Streamly.Internal.Data.Stream.Type
, module Streamly.Internal.Data.Stream.Bottom
, module Streamly.Internal.Data.Stream.Eliminate
, module Streamly.Internal.Data.Stream.Exception
, module Streamly.Internal.Data.Stream.Expand
, module Streamly.Internal.Data.Stream.Generate
, module Streamly.Internal.Data.Stream.Lift
, module Streamly.Internal.Data.Stream.Reduce
, module Streamly.Internal.Data.Stream.Transform
, module Streamly.Internal.Data.Stream.Cross
, module Streamly.Internal.Data.Stream.Zip
-- modules having dependencies on libraries other than base
, module Streamly.Internal.Data.Stream.Transformer
import Streamly.Internal.Data.Stream.Bottom
import Streamly.Internal.Data.Stream.Cross
import Streamly.Internal.Data.Stream.Eliminate
import Streamly.Internal.Data.Stream.Exception
import Streamly.Internal.Data.Stream.Expand
import Streamly.Internal.Data.Stream.Generate
import Streamly.Internal.Data.Stream.Lift
import Streamly.Internal.Data.Stream.Reduce
import Streamly.Internal.Data.Stream.Transform
import Streamly.Internal.Data.Stream.Type
import Streamly.Internal.Data.Stream.Zip
import Streamly.Internal.Data.Stream.Transformer

@ -1,135 +0,0 @@
-- |
-- Module : Streamly.Internal.Data.Stream.Transformer
-- Copyright : (c) 2019 Composewell Technologies
-- License : BSD-3-Clause
-- Maintainer :
-- Stability : experimental
-- Portability : GHC
module Streamly.Internal.Data.Stream.Transformer
, foldrT
, liftInner
, usingReaderT
, runReaderT
, evalStateT
, usingStateT
, runStateT
import Control.Monad.Trans.Class (MonadTrans)
import Control.Monad.Trans.Reader (ReaderT)
import Control.Monad.Trans.State.Strict (StateT)
import Streamly.Internal.Data.Stream.Type (Stream, fromStreamD, toStreamD)
import qualified Streamly.Internal.Data.Stream.StreamD.Transformer as D
-- $setup
-- >>> :m
-- >>> import Control.Monad.Trans.Class (lift)
-- >>> import Control.Monad.Trans.Identity (runIdentityT)
-- >>> import qualified Streamly.Internal.Data.Stream as Stream
-- | Lazy left fold to a transformer monad.
{-# INLINE foldlT #-}
foldlT :: (Monad m, Monad (s m), MonadTrans s)
=> (s m b -> a -> s m b) -> s m b -> Stream m a -> s m b
foldlT f z s = D.foldlT f z (toStreamD s)
-- | Right fold to a transformer monad. This is the most general right fold
-- function. 'foldrS' is a special case of 'foldrT', however 'foldrS'
-- implementation can be more efficient:
-- >>> foldrS = Stream.foldrT
-- >>> step f x xs = lift $ f x (runIdentityT xs)
-- >>> foldrM f z s = runIdentityT $ Stream.foldrT (step f) (lift z) s
-- 'foldrT' can be used to translate streamly streams to other transformer
-- monads e.g. to a different streaming type.
-- /Pre-release/
{-# INLINE foldrT #-}
foldrT :: (Monad m, Monad (s m), MonadTrans s)
=> (a -> s m b -> s m b) -> s m b -> Stream m a -> s m b
foldrT f z s = D.foldrT f z (toStreamD s)
-- Add and remove a monad transformer
-- | Lift the inner monad @m@ of @Stream m a@ to @t m@ where @t@ is a monad
-- transformer.
{-# INLINE liftInner #-}
liftInner :: (Monad m, MonadTrans t, Monad (t m))
=> Stream m a -> Stream (t m) a
liftInner xs = fromStreamD $ D.liftInner (toStreamD xs)
-- Sharing read only state in a stream
-- | Evaluate the inner monad of a stream as 'ReaderT'.
{-# INLINE runReaderT #-}
runReaderT :: Monad m => m s -> Stream (ReaderT s m) a -> Stream m a
runReaderT s xs = fromStreamD $ D.runReaderT s (toStreamD xs)
-- | Run a stream transformation using a given environment.
-- See also: ''
-- / Internal/
{-# INLINE usingReaderT #-}
:: Monad m
=> m r
-> (Stream (ReaderT r m) a -> Stream (ReaderT r m) a)
-> Stream m a
-> Stream m a
usingReaderT r f xs = runReaderT r $ f $ liftInner xs
-- Sharing read write state in a stream
-- | Evaluate the inner monad of a stream as 'StateT'.
-- >>> evalStateT s = fmap snd . Stream.runStateT s
-- / Internal/
{-# INLINE evalStateT #-}
evalStateT :: Monad m => m s -> Stream (StateT s m) a -> Stream m a
-- evalStateT s = fmap snd . runStateT s
evalStateT s xs = fromStreamD $ D.evalStateT s (toStreamD xs)
-- | Run a stateful (StateT) stream transformation using a given state.
-- >>> usingStateT s f = Stream.evalStateT s . f . Stream.liftInner
-- See also: 'scan'
-- / Internal/
{-# INLINE usingStateT #-}
:: Monad m
=> m s
-> (Stream (StateT s m) a -> Stream (StateT s m) a)
-> Stream m a
-> Stream m a
usingStateT s f = evalStateT s . f . liftInner
-- | Evaluate the inner monad of a stream as 'StateT' and emit the resulting
-- state and value pair after each step.
{-# INLINE runStateT #-}
runStateT :: Monad m => m s -> Stream (StateT s m) a -> Stream m (s, a)
runStateT s xs = fromStreamD $ D.runStateT s (toStreamD xs)

{-# LANGUAGE UndecidableInstances #-}
-- |
-- Module : Streamly.Internal.Data.Stream.Type
-- Copyright : (c) 2017 Composewell Technologies
-- License : BSD-3-Clause
-- Maintainer :
-- Stability : experimental
-- Portability : GHC
module Streamly.Internal.Data.Stream.Type
-- * Stream Type
Stream -- XXX To be removed
, StreamK
-- * Type Conversion
, fromStreamK
, toStreamK
, fromStreamD
, toStreamD
, fromStream
, toStream
, Streamly.Internal.Data.Stream.Type.fromList
-- * Construction
, cons
, consM
, nil
, nilM
, fromPure
, fromEffect
-- * Applicative
, crossApply
, crossApplySnd
, crossApplyFst
, crossWith
, cross
-- * Bind/Concat
, bindWith
, concatMapWith
-- * Double folds
, eqBy
, cmpBy
#include "inline.hs"
import Control.Applicative (liftA2)
import Data.Foldable (Foldable(foldl'), fold)
import Data.Functor.Identity (Identity(..), runIdentity)
import Data.Maybe (fromMaybe)
import Data.Semigroup (Endo(..))
import GHC.Exts (IsList(..), IsString(..), oneShot)
import Streamly.Internal.BaseCompat ((#.))
import Streamly.Internal.Data.Maybe.Strict (Maybe'(..), toMaybe)
import Text.Read
( Lexeme(Ident), lexP, parens, prec, readPrec, readListPrec
, readListPrecDefault)
import qualified Streamly.Internal.Data.Stream.Common as P
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
-- $setup
-- >>> import qualified Streamly.Data.Fold as Fold
-- >>> import qualified Streamly.Internal.Data.Unfold as Unfold
-- >>> import qualified Streamly.Internal.Data.Stream as Stream
-- Stream
-- | Semigroup instance appends two streams:
-- >>> (<>) = Stream.append
newtype StreamK m a = StreamK (K.StreamK m a)
-- XXX when deriving do we inherit an INLINE?
deriving (Semigroup, Monoid)
type Stream = StreamK
-- Conversions
{-# INLINE_EARLY fromStreamK #-}
fromStreamK :: K.StreamK m a -> Stream m a
fromStreamK = StreamK
{-# INLINE_EARLY toStreamK #-}
toStreamK :: Stream m a -> K.StreamK m a
toStreamK (StreamK k) = k
{-# INLINE_EARLY fromStreamD #-}
fromStreamD :: Monad m => D.Stream m a -> Stream m a
fromStreamD = fromStreamK . D.toStreamK
{-# INLINE_EARLY toStreamD #-}
toStreamD :: Applicative m => Stream m a -> D.Stream m a
toStreamD = D.fromStreamK . toStreamK
{-# INLINE fromStream #-}
fromStream :: Monad m => D.Stream m a -> Stream m a
fromStream = fromStreamD
{-# INLINE toStream #-}
toStream :: Applicative m => Stream m a -> D.Stream m a
toStream = toStreamD
-- Generation
-- |
-- >>> fromList = Prelude.foldr Stream.cons Stream.nil
-- Construct a stream from a list of pure values. This is more efficient than
-- 'fromFoldable'.
{-# INLINE fromList #-}
fromList :: Monad m => [a] -> Stream m a
fromList = fromStreamK . P.fromList
-- Comparison
-- | Compare two streams for equality
{-# INLINE eqBy #-}
eqBy :: Monad m =>
(a -> b -> Bool) -> Stream m a -> Stream m b -> m Bool
eqBy f m1 m2 = D.eqBy f (toStreamD m1) (toStreamD m2)
-- | Compare two streams
{-# INLINE cmpBy #-}
:: Monad m
=> (a -> b -> Ordering) -> Stream m a -> Stream m b -> m Ordering
cmpBy f m1 m2 = D.cmpBy f (toStreamD m1) (toStreamD m2)
-- Functor
instance Monad m => Functor (Stream m) where
{-# INLINE fmap #-}
-- IMPORTANT: do not use eta reduction.
fmap f m = fromStreamD $ D.mapM (return . f) $ toStreamD m
{-# INLINE (<$) #-}
(<$) = fmap . const
-- Lists
-- Serial streams can act like regular lists using the Identity monad
-- XXX Show instance is 10x slower compared to read, we can do much better.
-- The list show instance itself is really slow.
-- XXX The default definitions of "<" in the Ord instance etc. do not perform
-- well, because they do not get inlined. Need to add INLINE in Ord class in
-- base?
instance IsList (Stream Identity a) where
type (Item (Stream Identity a)) = a
{-# INLINE fromList #-}
fromList xs = StreamK $ P.fromList xs
{-# INLINE toList #-}
toList (StreamK xs) = runIdentity $ P.toList xs
instance Eq a => Eq (Stream Identity a) where
{-# INLINE (==) #-}
(==) (StreamK xs) (StreamK ys) = runIdentity $ P.eqBy (==) xs ys
instance Ord a => Ord (Stream Identity a) where
{-# INLINE compare #-}
compare (StreamK xs) (StreamK ys) = runIdentity $ P.cmpBy compare xs ys
{-# INLINE (<) #-}
x < y =
case compare x y of
LT -> True
_ -> False
{-# INLINE (<=) #-}
x <= y =
case compare x y of
GT -> False
_ -> True
{-# INLINE (>) #-}
x > y =
case compare x y of
GT -> True
_ -> False
{-# INLINE (>=) #-}
x >= y =
case compare x y of
LT -> False
_ -> True
{-# INLINE max #-}
max x y = if x <= y then y else x
{-# INLINE min #-}
min x y = if x <= y then x else y
instance Show a => Show (Stream Identity a) where
showsPrec p dl = showParen (p > 10) $
showString "fromList " . shows (toList dl)
instance Read a => Read (Stream Identity a) where
readPrec = parens $ prec 10 $ do
Ident "fromList" <- lexP
Streamly.Internal.Data.Stream.Type.fromList <$> readPrec
readListPrec = readListPrecDefault
instance (a ~ Char) => IsString (Stream Identity a) where
{-# INLINE fromString #-}
fromString xs = StreamK $ P.fromList xs
-- Foldable
-- The default Foldable instance has several issues:
-- 1) several definitions do not have INLINE on them, so we provide
-- re-implementations with INLINE pragmas.
-- 2) the definitions of sum/product/maximum/minimum are inefficient as they
-- use right folds, they cannot run in constant memory. We provide
-- implementations using strict left folds here.
instance (Foldable m, Monad m) => Foldable (Stream m) where
{-# INLINE foldMap #-}
foldMap f (StreamK xs) = fold $ P.foldr (mappend . f) mempty xs
{-# INLINE foldr #-}
foldr f z t = appEndo (foldMap (Endo #. f) t) z
{-# INLINE foldl' #-}
foldl' f z0 xs = foldr f' id xs z0
where f' x k = oneShot $ \z -> k $! f z x
{-# INLINE length #-}
length = foldl' (\n _ -> n + 1) 0
{-# INLINE elem #-}
elem = any . (==)
{-# INLINE maximum #-}
maximum =
fromMaybe (errorWithoutStackTrace "maximum: empty stream")
. toMaybe
. foldl' getMax Nothing'
getMax Nothing' x = Just' x
getMax (Just' mx) x = Just' $! max mx x
{-# INLINE minimum #-}
minimum =
fromMaybe (errorWithoutStackTrace "minimum: empty stream")
. toMaybe
. foldl' getMin Nothing'
getMin Nothing' x = Just' x
getMin (Just' mn) x = Just' $! min mn x
{-# INLINE sum #-}
sum = foldl' (+) 0
{-# INLINE product #-}
product = foldl' (*) 1
-- Traversable
instance Traversable (Stream Identity) where
{-# INLINE traverse #-}
traverse f (StreamK xs) =
fmap StreamK $ runIdentity $ P.foldr consA (pure mempty) xs
consA x ys = liftA2 K.cons (f x) ys
-- Construction
infixr 5 `cons`
-- | A right associative prepend operation to add a pure value at the head of
-- an existing stream::
-- >>> s = 1 `Stream.cons` 2 `Stream.cons` 3 `Stream.cons` Stream.nil
-- >>> Stream.fold Fold.toList s
-- [1,2,3]
-- It can be used efficiently with 'Prelude.foldr':
-- >>> fromFoldable = Prelude.foldr Stream.cons Stream.nil
-- Same as the following but more efficient:
-- >>> cons x xs = return x `Stream.consM` xs
-- /CPS/
{-# INLINE_NORMAL cons #-}
cons :: a -> Stream m a -> Stream m a
cons x = fromStreamK . K.cons x . toStreamK
infixr 5 `consM`
-- | A right associative prepend operation to add an effectful value at the
-- head of an existing stream::
-- >>> s = putStrLn "hello" `consM` putStrLn "world" `consM` Stream.nil
-- >>> Stream.fold Fold.drain s
-- hello
-- world
-- It can be used efficiently with 'Prelude.foldr':
-- >>> fromFoldableM = Prelude.foldr Stream.consM Stream.nil
-- Same as the following but more efficient:
-- >>> consM x xs = Stream.fromEffect x `Stream.append` xs
-- /CPS/
{-# INLINE consM #-}
{-# SPECIALIZE consM :: IO a -> Stream IO a -> Stream IO a #-}
consM :: Monad m => m a -> Stream m a -> Stream m a
consM m = fromStreamK . K.consM m . toStreamK
-- | A stream that terminates without producing any output or side effect.
-- >>> Stream.fold Fold.toList Stream.nil
-- []
{-# INLINE_NORMAL nil #-}
nil :: Stream m a
nil = fromStreamK K.nil
-- | A stream that terminates without producing any output, but produces a side
-- effect.
-- >>> Stream.fold Fold.toList (Stream.nilM (print "nil"))
-- "nil"
-- []
-- /Pre-release/
{-# INLINE_NORMAL nilM #-}
nilM :: Monad m => m b -> Stream m a
nilM = fromStreamK . K.nilM
-- | Create a singleton stream from a pure value.
-- >>> fromPure a = a `cons` Stream.nil
-- >>> fromPure = pure
-- >>> fromPure = fromEffect . pure
{-# INLINE_NORMAL fromPure #-}
fromPure :: a -> Stream m a
fromPure = fromStreamK . K.fromPure
-- | Create a singleton stream from a monadic action.
-- >>> fromEffect m = m `consM` Stream.nil
-- >>> fromEffect = Stream.sequence . Stream.fromPure
-- >>> Stream.fold Fold.drain $ Stream.fromEffect (putStrLn "hello")
-- hello
{-# INLINE_NORMAL fromEffect #-}
fromEffect :: Monad m => m a -> Stream m a
fromEffect = fromStreamK . K.fromEffect
-- Applicative
-- | Apply a stream of functions to a stream of values and flatten the results.
-- Note that the second stream is evaluated multiple times.
-- >>> crossApply = Stream.crossWith id
{-# INLINE crossApply #-}
crossApply :: Stream m (a -> b) -> Stream m a -> Stream m b
crossApply m1 m2 =
fromStreamK $ K.crossApply (toStreamK m1) (toStreamK m2)
{-# INLINE crossApplySnd #-}
crossApplySnd :: Stream m a -> Stream m b -> Stream m b
crossApplySnd m1 m2 =
fromStreamK $ K.crossApplySnd (toStreamK m1) (toStreamK m2)
{-# INLINE crossApplyFst #-}
crossApplyFst :: Stream m a -> Stream m b -> Stream m a
crossApplyFst m1 m2 =
fromStreamK $ K.crossApplyFst (toStreamK m1) (toStreamK m2)
-- |
-- Definition:
-- >>> crossWith f m1 m2 = fmap f m1 `Stream.crossApply` m2
-- Note that the second stream is evaluated multiple times.
{-# INLINE crossWith #-}
crossWith :: Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
crossWith f m1 m2 = fmap f m1 `crossApply` m2
-- | Given a @Stream m a@ and @Stream m b@ generate a stream with all possible
-- combinations of the tuple @(a, b)@.
-- Definition:
-- >>> cross = Stream.crossWith (,)
-- The second stream is evaluated multiple times. If that is not desired it can
-- be cached in an 'Data.Array.Array' and then generated from the array before
-- calling this function. Caching may also improve performance if the stream is
-- expensive to evaluate.
-- See 'Streamly.Internal.Data.Unfold.cross' for a much faster fused
-- alternative.
-- Time: O(m x n)
-- /Pre-release/
{-# INLINE cross #-}
cross :: Monad m => Stream m a -> Stream m b -> Stream m (a, b)
cross = crossWith (,)
-- Bind/Concat
-- |
-- /CPS/
{-# INLINE bindWith #-}
:: (Stream m b -> Stream m b -> Stream m b)
-> Stream m a
-> (a -> Stream m b)
-> Stream m b
bindWith par m1 f =
$ K.bindWith
(\s1 s2 -> toStreamK $ par (fromStreamK s1) (fromStreamK s2))
(toStreamK m1)
(toStreamK . f)
-- | @concatMapWith mixer generator stream@ is a two dimensional looping
-- combinator. The @generator@ function is used to generate streams from the
-- elements in the input @stream@ and the @mixer@ function is used to merge
-- those streams.
-- /CPS/
{-# INLINE concatMapWith #-}
:: (Stream m b -> Stream m b -> Stream m b)
-> (a -> Stream m b)
-> Stream m a
-> Stream m b
concatMapWith par f xs = bindWith par xs f

{-# LANGUAGE UndecidableInstances #-}
-- |
-- Module : Streamly.Internal.Data.Stream.Zip
-- Copyright : (c) 2017 Composewell Technologies
-- License : BSD3
-- Maintainer :
-- Stability : experimental
-- Portability : GHC
-- To run examples in this module:
-- >>> import qualified Streamly.Data.Fold as Fold
-- >>> import qualified Streamly.Data.Stream as Stream
-- >>> import qualified Streamly.Internal.Data.Stream.Zip as Stream
module Streamly.Internal.Data.Stream.Zip
ZipStream (..)
, ZipSerialM
, ZipSerial
import Data.Functor.Identity (Identity(..))
import GHC.Exts (IsList(..), IsString(..))
import Streamly.Internal.Data.Stream.Type (Stream)
import Text.Read
( Lexeme(Ident), lexP, parens, prec, readPrec, readListPrec
, readListPrecDefault)
import qualified Streamly.Internal.Data.Stream.Bottom as Stream
import qualified Streamly.Internal.Data.Stream.Generate as Stream
-- $setup
-- >>> import qualified Streamly.Data.Fold as Fold
-- >>> import qualified Streamly.Data.Stream as Stream
-- >>> import qualified Streamly.Internal.Data.Stream.Zip as Stream
-- Serially Zipping Streams
-- | For 'ZipStream':
-- @
-- (<>) = 'Streamly.Data.Stream.append'
-- (\<*>) = 'Streamly.Data.Stream.zipWith' id
-- @
-- Applicative evaluates the streams being zipped serially:
-- >>> s1 = Stream.ZipStream $ Stream.fromFoldable [1, 2]
-- >>> s2 = Stream.ZipStream $ Stream.fromFoldable [3, 4]
-- >>> s3 = Stream.ZipStream $ Stream.fromFoldable [5, 6]
-- >>> s = (,,) <$> s1 <*> s2 <*> s3
-- >>> Stream.fold Fold.toList (Stream.unZipStream s)
-- [(1,3,5),(2,4,6)]
newtype ZipStream m a = ZipStream {unZipStream :: Stream m a}
deriving (Functor, Semigroup, Monoid)
deriving instance IsList (ZipStream Identity a)
deriving instance (a ~ Char) => IsString (ZipStream Identity a)
deriving instance Eq a => Eq (ZipStream Identity a)
deriving instance Ord a => Ord (ZipStream Identity a)
deriving instance (Foldable m, Monad m) => Foldable (ZipStream m)
deriving instance Traversable (ZipStream Identity)
instance Show a => Show (ZipStream Identity a) where
showsPrec p dl = showParen (p > 10) $
showString "fromList " . shows (toList dl)
instance Read a => Read (ZipStream Identity a) where
readPrec = parens $ prec 10 $ do
Ident "fromList" <- lexP
fromList <$> readPrec
readListPrec = readListPrecDefault
type ZipSerialM = ZipStream
-- | An IO stream whose applicative instance zips streams serially.
type ZipSerial = ZipSerialM IO
instance Monad m => Applicative (ZipStream m) where
pure = ZipStream . Stream.repeat
{-# INLINE (<*>) #-}
ZipStream m1 <*> ZipStream m2 = ZipStream $ Stream.zipWith id m1 m2

-- * Unfolds
-- One to one correspondence with
-- "Streamly.Internal.Data.Stream.Generate"
-- "Streamly.Internal.Data.Stream.StreamD.Generate"
-- ** Basic Constructors
, mkUnfoldM
, mkUnfoldrM

View File

@ -422,23 +422,7 @@ library
if flag(dev)
, Streamly.Internal.Data.Stream.Type
, Streamly.Internal.Data.Stream.Eliminate
, Streamly.Internal.Data.Stream.Enumerate
, Streamly.Internal.Data.Stream.Generate
, Streamly.Internal.Data.Stream.Transform
, Streamly.Internal.Data.Stream.Bottom
, Streamly.Internal.Data.Stream.Exception
, Streamly.Internal.Data.Stream.Expand
, Streamly.Internal.Data.Stream.Lift
, Streamly.Internal.Data.Stream.Reduce
, Streamly.Internal.Data.Stream.Transformer
, Streamly.Internal.Data.Stream.StreamDK
, Streamly.Internal.Data.Stream.Zip
, Streamly.Internal.Data.Stream.Cross
, Streamly.Internal.Data.List
, Streamly.Data.Stream.Zip
--, Streamly.Internal.Data.Parser.ParserDK
-- streamly-base