Add unfoldWords/interpose/intercalate etc.

* Rearrange, update benchmarks for words/unwords, lines/unlines
* Add interpose/intercalate/interleaveInfix/interleaveSuffix to Prelude
* Add unfoldWords to Data.String

Use the newly added rotuines to write words/unwords and lines/unlines more
idiomatically and with improved performance.
This commit is contained in:
Harendra Kumar 2019-10-15 00:34:40 +05:30
parent fbec11f24d
commit a9f3f9a0fb
9 changed files with 644 additions and 97 deletions

View File

@ -201,20 +201,32 @@ main = do
Handles inh _ <- readIORef href
BFS.chunksOf 1000 inh
]
, bgroup "group-ungroup"
[ mkBench "lines-unlines" href $ do
, bgroup "group-ungroup-stream"
[ mkBench "lines-unlines-[Char]" href $ do
Handles inh outh <- readIORef href
BFS.linesUnlinesCopy inh outh
, mkBench "lines-unlines-arrays" href $ do
, mkBench "lines-unlines-Word8Array" href $ do
Handles inh outh <- readIORef href
BFA.linesUnlinesCopy inh outh
, mkBench "words-unwords" href $ do
BFS.linesUnlinesArrayWord8Copy inh outh
, mkBench "lines-unlines-CharArray" href $ do
Handles inh outh <- readIORef href
BFS.wordsUnwordsCopy inh outh
, mkBench "words-unwords-word8" href $ do
BFS.linesUnlinesArrayCharCopy inh outh
, mkBench "words-unwords-[Word8]" href $ do
Handles inh outh <- readIORef href
BFS.wordsUnwordsCopyWord8 inh outh
, mkBench "words-unwords-arrays" href $ do
, mkBench "words-unwords-[Char]" href $ do
Handles inh outh <- readIORef href
BFS.wordsUnwordsCopy inh outh
, mkBench "words-unwords-CharArray" href $ do
Handles inh outh <- readIORef href
BFS.wordsUnwordsCharArrayCopy inh outh
]
, bgroup "group-ungroup-array-stream"
[ mkBench "lines-unlines-Word8Array" href $ do
Handles inh outh <- readIORef href
BFA.linesUnlinesCopy inh outh
, mkBench "words-unwords-Word8Array" href $ do
Handles inh outh <- readIORef href
BFA.wordsUnwordsCopy inh outh
]

View File

@ -48,7 +48,6 @@ import qualified Streamly.Prelude as S
import qualified Streamly.Data.String as SS
import qualified Streamly.Internal.FileSystem.Handle as IFH
import qualified Streamly.Internal.Prelude as Internal
import qualified Streamly.Internal.Memory.Array as IA
import qualified Streamly.Internal.Memory.ArrayStream as AS
import qualified Streamly.Internal.Data.Unfold as IUF
@ -186,8 +185,8 @@ inspect $ 'copy `hasNoType` ''Step
{-# INLINE linesUnlinesCopy #-}
linesUnlinesCopy :: Handle -> Handle -> IO ()
linesUnlinesCopy inh outh =
S.fold (IFH.writeArraysInChunksOf (1024*1024) outh)
$ Internal.insertAfterEach (return $ A.fromList [10])
S.fold (IFH.writeInChunksOf (1024*1024) outh)
$ AS.interposeSuffix 10
$ AS.splitOnSuffix 10
$ IFH.toStreamArraysOf (1024*1024) inh
@ -200,9 +199,9 @@ inspect $ hasNoTypeClassesExcept 'linesUnlinesCopy [''Storable]
{-# INLINE wordsUnwordsCopy #-}
wordsUnwordsCopy :: Handle -> Handle -> IO ()
wordsUnwordsCopy inh outh =
S.fold (IFH.writeArraysInChunksOf (1024*1024) outh)
$ S.intersperse (A.fromList [32])
-- XXX use a word splitting combinator
S.fold (IFH.writeInChunksOf (1024*1024) outh)
$ AS.interpose 32
-- XXX this is not correct word splitting combinator
$ AS.splitOn 32
$ IFH.toStreamArraysOf (1024*1024) inh

View File

@ -40,8 +40,12 @@ module Streamly.Benchmark.FileIO.Stream
, catFinallyStream
, copy
, linesUnlinesCopy
, linesUnlinesArrayWord8Copy
, linesUnlinesArrayCharCopy
-- , linesUnlinesArrayUtf8Copy
, wordsUnwordsCopyWord8
, wordsUnwordsCopy
, wordsUnwordsCharArrayCopy
, readWord8
, decodeChar8
, copyCodecChar8
@ -68,9 +72,11 @@ import Prelude hiding (last, length)
import qualified Streamly.FileSystem.Handle as FH
import qualified Streamly.Internal.FileSystem.Handle as IFH
import qualified Streamly.Memory.Array as A
-- import qualified Streamly.Internal.Memory.Array as IA
import qualified Streamly.Internal.Memory.Array.Types as AT
import qualified Streamly.Prelude as S
import qualified Streamly.Data.Fold as FL
-- import qualified Streamly.Internal.Data.Fold as IFL
import qualified Streamly.Data.String as SS
import qualified Streamly.Internal.Data.Unfold as IUF
import qualified Streamly.Internal.Prelude as IP
@ -400,11 +406,29 @@ inspect $ 'chunksOfD `hasNoType` ''AT.FlattenState
inspect $ 'chunksOfD `hasNoType` ''D.ConcatMapUState
#endif
-- XXX splitSuffixOn requires -funfolding-use-threshold=150 for better fusion
-- | Lines and unlines
{-# INLINE linesUnlinesCopy #-}
linesUnlinesCopy :: Handle -> Handle -> IO ()
linesUnlinesCopy inh outh =
S.fold (FH.write outh)
$ SS.encodeChar8
$ SS.unfoldLines IUF.fromList
$ S.splitOnSuffix (== '\n') FL.toList
$ SS.decodeChar8
$ S.unfold FH.read inh
{-# INLINE linesUnlinesArrayWord8Copy #-}
linesUnlinesArrayWord8Copy :: Handle -> Handle -> IO ()
linesUnlinesArrayWord8Copy inh outh =
S.fold (FH.write outh)
$ IP.interposeSuffix 10 A.read
$ S.splitOnSuffix (== 10) A.write
$ S.unfold FH.read inh
-- XXX splitSuffixOn requires -funfolding-use-threshold=150 for better fusion
-- | Lines and unlines
{-# INLINE linesUnlinesArrayCharCopy #-}
linesUnlinesArrayCharCopy :: Handle -> Handle -> IO ()
linesUnlinesArrayCharCopy inh outh =
S.fold (FH.write outh)
$ SS.encodeChar8
$ SS.unlines
@ -413,12 +437,26 @@ linesUnlinesCopy inh outh =
$ S.unfold FH.read inh
#ifdef INSPECTION
inspect $ hasNoTypeClassesExcept 'linesUnlinesCopy [''Storable]
-- inspect $ 'linesUnlinesCopy `hasNoType` ''Step
-- inspect $ 'linesUnlinesCopy `hasNoType` ''AT.FlattenState
-- inspect $ 'linesUnlinesCopy `hasNoType` ''D.ConcatMapUState
inspect $ hasNoTypeClassesExcept 'linesUnlinesArrayCharCopy [''Storable]
-- inspect $ 'linesUnlinesArrayCharCopy `hasNoType` ''Step
-- inspect $ 'linesUnlinesArrayCharCopy `hasNoType` ''AT.FlattenState
-- inspect $ 'linesUnlinesArrayCharCopy `hasNoType` ''D.ConcatMapUState
#endif
-- XXX to write this we need to be able to map decodeUtf8 on the A.read fold.
-- For that we have to write decodeUtf8 as a Pipe.
{-
{-# INLINE linesUnlinesArrayUtf8Copy #-}
linesUnlinesArrayUtf8Copy :: Handle -> Handle -> IO ()
linesUnlinesArrayUtf8Copy inh outh =
S.fold (FH.write outh)
$ SS.encodeChar8
$ IP.intercalate (A.fromList [10]) (pipe SS.decodeUtf8P A.read)
$ S.splitOnSuffix (== '\n') (IFL.lmap SS.encodeUtf8 A.write)
$ SS.decodeChar8
$ S.unfold FH.read inh
-}
foreign import ccall unsafe "u_iswspace"
iswspace :: Int -> Int
@ -446,8 +484,7 @@ isSp = isSpace . chr . fromIntegral
wordsUnwordsCopyWord8 :: Handle -> Handle -> IO ()
wordsUnwordsCopyWord8 inh outh =
S.fold (FH.write outh)
$ S.concatUnfold IUF.fromList
$ S.intersperse [32]
$ IP.interposeSuffix 32 IUF.fromList
$ S.wordsBy isSp FL.toList
$ S.unfold FH.read inh
@ -463,14 +500,7 @@ wordsUnwordsCopy :: Handle -> Handle -> IO ()
wordsUnwordsCopy inh outh =
S.fold (FH.write outh)
$ SS.encodeChar8
$ S.concatUnfold IUF.fromList
$ S.intersperse " "
-- Array allocation is too expensive for such small strings. So just use
-- lists instead.
--
-- -- $ SS.unwords
-- -- $ SS.words
--
$ SS.unfoldWords IUF.fromList
-- XXX This pipeline does not fuse with wordsBy but fuses with splitOn
-- with -funfolding-use-threshold=300. With wordsBy it does not fuse
-- even with high limits for inlining and spec-constr ghc options. With
@ -490,6 +520,16 @@ wordsUnwordsCopy inh outh =
-- inspect $ 'wordsUnwordsCopy `hasNoType` ''D.ConcatMapUState
#endif
{-# INLINE wordsUnwordsCharArrayCopy #-}
wordsUnwordsCharArrayCopy :: Handle -> Handle -> IO ()
wordsUnwordsCharArrayCopy inh outh =
S.fold (FH.write outh)
$ SS.encodeChar8
$ SS.unwords
$ SS.words
$ SS.decodeChar8
$ S.unfold FH.read inh
lf :: Word8
lf = fromIntegral (ord '\n')

View File

@ -74,6 +74,7 @@ module Streamly.Data.String
, foldLines
, foldWords
, unfoldLines
, unfoldWords
-- * Streams of Strings
, lines
@ -95,9 +96,7 @@ import Streamly.Internal.Data.Unfold (Unfold)
import qualified Streamly.Internal.Prelude as S
import qualified Streamly.Memory.Array as A
import qualified Streamly.Internal.Memory.ArrayStream as AS
import qualified Streamly.Streams.StreamD as D
import qualified Streamly.Internal.Data.Unfold as UF
-- type String = List Char
@ -269,11 +268,9 @@ words = foldWords A.write
-- | Unfold a stream to character streams using the supplied 'Unfold'
-- and concat the results suffixing a newline character @\\n@ to each stream.
--
-- > unfoldLines = S.intercalateSuffix UF.identity '\n'
--
{-# INLINE unfoldLines #-}
unfoldLines :: (MonadIO m, IsStream t) => Unfold m a Char -> t m a -> t m Char
unfoldLines unf = S.intercalateSuffix UF.identity '\n' unf
unfoldLines = S.interposeSuffix '\n'
-- | Flattens the stream of @Array Char@, after appending a terminating
-- newline to each string.
@ -292,6 +289,14 @@ unfoldLines unf = S.intercalateSuffix UF.identity '\n' unf
unlines :: (MonadIO m, IsStream t) => t m (Array Char) -> t m Char
unlines = unfoldLines A.read
-- | Unfold the elements of a stream to character streams using the supplied
-- 'Unfold' and concat the results with a whitespace character infixed between
-- the streams.
--
{-# INLINE unfoldWords #-}
unfoldWords :: (MonadIO m, IsStream t) => Unfold m a Char -> t m a -> t m Char
unfoldWords = S.interpose ' '
-- | Flattens the stream of @Array Char@, after appending a separating
-- space to each string.
--
@ -300,12 +305,11 @@ unlines = unfoldLines A.read
-- >>> S.toList $ unwords $ S.fromList ["unwords", "this", "string"]
-- "unwords this string"
--
--
-- > unwords = A.concat . (S.intersperse (A.fromList " "))
-- > unwords = unfoldWords A.read
--
-- Note that, in general
--
-- > unwords . words /= id
{-# INLINE unwords #-}
unwords :: (MonadAsync m, IsStream t) => t m (Array Char) -> t m Char
unwords = AS.concat . (S.intersperse (A.fromList " "))
unwords = unfoldWords A.read

View File

@ -83,6 +83,7 @@ module Streamly.Internal.Data.Unfold
, effect
, singleton
, identity
, const
, replicateM
, fromList
, fromListM
@ -119,7 +120,7 @@ where
import Control.Exception (Exception)
import Data.Void (Void)
import GHC.Types (SPEC(..))
import Prelude hiding (concat, map, takeWhile, take, filter)
import Prelude hiding (concat, map, takeWhile, take, filter, const)
import Streamly.Streams.StreamD.Type (Stream(..), Step(..))
#if __GLASGOW_HASKELL__ < 800
@ -130,6 +131,7 @@ import Streamly.Internal.Data.Fold.Types (Fold(..))
import Streamly.Internal.Data.SVar (defState)
import Control.Monad.Catch (MonadCatch)
import qualified Prelude
import qualified Control.Monad.Catch as MC
import qualified Data.Tuple as Tuple
import qualified Streamly.Streams.StreamK as K
@ -152,7 +154,7 @@ lmap f (Unfold ustep uinject) = Unfold ustep (uinject . f)
--
{-# INLINE_NORMAL supply #-}
supply :: Unfold m a b -> a -> Unfold m Void b
supply unf a = lmap (const a) unf
supply unf a = lmap (Prelude.const a) unf
-- | Supply the first component of the tuple to an unfold that accepts a tuple
-- as a seed resulting in a fold that accepts the second component of the tuple
@ -344,6 +346,12 @@ singleton f = Unfold step inject
identity :: Monad m => Unfold m a a
identity = singleton return
const :: Monad m => m b -> Unfold m a b
const m = Unfold step inject
where
inject _ = return ()
step () = m >>= \r -> return $ Yield r ()
-- | Generates a stream replicating the seed @n@ times.
--
{-# INLINE replicateM #-}

View File

@ -26,6 +26,7 @@ module Streamly.Internal.Memory.ArrayStream
-- * Flattening to elements
, concat
, concatRev
, interpose
, interposeSuffix
, intercalateSuffix
@ -51,7 +52,6 @@ import Streamly.Internal.Memory.Array.Types (Array(..), length)
import Streamly.Streams.Serial (SerialT)
import Streamly.Streams.StreamK.Type (IsStream)
import qualified Streamly.Internal.Data.Unfold as UF
import qualified Streamly.Internal.Memory.Array as A
import qualified Streamly.Internal.Memory.Array.Types as A
import qualified Streamly.Internal.Prelude as S
@ -86,10 +86,18 @@ concat m = D.fromStreamD $ D.concatMapU A.read (D.toStreamD m)
concatRev :: (IsStream t, MonadIO m, Storable a) => t m (Array a) -> t m a
concatRev m = D.fromStreamD $ A.flattenArraysRev (D.toStreamD m)
-- | Flatten a stream of arrays after inserting the given element between
-- arrays.
--
-- /Internal/
{-# INLINE interpose #-}
interpose :: (MonadIO m, IsStream t, Storable a) => a -> t m (Array a) -> t m a
interpose x = S.interpose x A.read
{-# INLINE intercalateSuffix #-}
intercalateSuffix :: (MonadIO m, IsStream t, Storable a)
=> Array a -> t m (Array a) -> t m a
intercalateSuffix arr = S.intercalateSuffix A.read arr A.read
intercalateSuffix arr = S.intercalateSuffix arr A.read
-- | Flatten a stream of arrays appending the given element after each
-- array.
@ -99,7 +107,7 @@ intercalateSuffix arr = S.intercalateSuffix A.read arr A.read
interposeSuffix :: (MonadIO m, IsStream t, Storable a)
=> a -> t m (Array a) -> t m a
-- interposeSuffix x = D.fromStreamD . A.unlines x . D.toStreamD
interposeSuffix x = S.intercalateSuffix UF.identity x A.read
interposeSuffix x = S.interposeSuffix x A.read
-- | Split a stream of arrays on a given separator byte, dropping the separator
-- and coalescing all the arrays between two separators into a single array.

View File

@ -33,7 +33,7 @@ module Streamly.Internal.Prelude
-- ** From Values
, yield
, yieldM
, K.repeat
, repeat
, repeatM
, replicate
, replicateM
@ -205,9 +205,9 @@ module Streamly.Internal.Prelude
, insertBy
, intersperseM
, intersperse
, insertAfterEach
, intersperseSuffix
-- , intersperseBySpan
, interject
, interjectSuffix
-- ** Reordering
, reverse
@ -220,8 +220,9 @@ module Streamly.Internal.Prelude
-- ** Interleaving
, interleave
, interleaveFst
, interleaveMin
, interleaveSuffix
, interleaveInfix
, Serial.wSerialFst
, Serial.wSerialMin
@ -254,8 +255,12 @@ module Streamly.Internal.Prelude
, concatUnfoldRoundrobin
, concatMap
, concatMapWith
-- , intercalate
, gintercalate
, gintercalateSuffix
, intercalate
, intercalateSuffix
, interpose
, interposeSuffix
-- -- ** Breaking
@ -400,7 +405,8 @@ import Prelude
foldl, map, mapM, mapM_, sequence, all, any, sum, product, elem,
notElem, maximum, minimum, head, last, tail, length, null,
reverse, iterate, init, and, or, lookup, foldr1, (!!),
scanl, scanl1, replicate, concatMap, span, splitAt, break)
scanl, scanl1, replicate, concatMap, span, splitAt, break,
repeat)
import qualified Data.Heap as H
import qualified Data.Map.Strict as Map
@ -671,6 +677,14 @@ replicateMSerial n = fromStreamS . S.replicateM n
replicate :: (IsStream t, Monad m) => Int -> a -> t m a
replicate n = fromStreamS . S.replicate n
-- |
-- Generate an infinite stream by repeating a pure value.
--
-- @since 0.4.0
{-# INLINE_NORMAL repeat #-}
repeat :: (IsStream t, Monad m) => a -> t m a
repeat = fromStreamS . S.repeat
-- |
-- @
-- repeatM = fix . consM
@ -687,9 +701,14 @@ replicate n = fromStreamS . S.replicate n
-- /Concurrent, infinite (do not use with 'parallely')/
--
-- @since 0.2.0
{-# INLINE_EARLY repeatM #-}
repeatM :: (IsStream t, MonadAsync m) => m a -> t m a
repeatM = go
where go m = m |: go m
repeatM = K.repeatM
{-# RULES "repeatM serial" repeatM = repeatMSerial #-}
{-# INLINE repeatMSerial #-}
repeatMSerial :: MonadAsync m => m a -> SerialT m a
repeatMSerial = fromStreamS . S.repeatM
-- |
-- @
@ -1831,9 +1850,9 @@ intersperse a = fromStreamS . S.intersperse a . toStreamS
-- | Insert a monadic action after each element in the stream.
--
-- @since 0.7.0
{-# INLINE insertAfterEach #-}
insertAfterEach :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a
insertAfterEach m = fromStreamD . D.insertAfterEach m . toStreamD
{-# INLINE intersperseSuffix #-}
intersperseSuffix :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a
intersperseSuffix m = fromStreamD . D.intersperseSuffix m . toStreamD
{-
-- | Intersperse a monadic action into the input stream after every @n@
@ -1854,16 +1873,16 @@ intersperseBySpan _n _f _xs = undefined
-- seconds.
--
-- @
-- > S.drain $ S.interject 1 (putChar ',') $ S.mapM (\\x -> threadDelay 1000000 >> putChar x) $ S.fromList "hello"
-- > S.drain $ S.interjectSuffix 1 (putChar ',') $ S.mapM (\\x -> threadDelay 1000000 >> putChar x) $ S.fromList "hello"
-- "h,e,l,l,o"
-- @
--
-- @since 0.7.0
{-# INLINE interject #-}
interject
{-# INLINE interjectSuffix #-}
interjectSuffix
:: (IsStream t, MonadAsync m)
=> Double -> m a -> t m a -> t m a
interject n f xs = xs `Par.parallelFst` repeatM timed
interjectSuffix n f xs = xs `Par.parallelFst` repeatM timed
where timed = liftIO (threadDelay (round $ n * 1000000)) >> f
-- | @insertBy cmp elem stream@ inserts @elem@ before the first element in
@ -2144,13 +2163,26 @@ concatMap f m = fromStreamD $ D.concatMap (toStreamD . f) (toStreamD m)
append ::(IsStream t, Monad m) => t m b -> t m b -> t m b
append m1 m2 = fromStreamD $ D.append (toStreamD m1) (toStreamD m2)
-- Same as 'wSerial'. We should perhaps rename wSerial. If named explicitly
-- this would be interleaveMax.
-- XXX Same as 'wSerial'. We should perhaps rename wSerial to interleave.
-- XXX Document the interleaving behavior of side effects in all the
-- interleaving combinators.
-- XXX Write time-domain equivalents of these. In the time domain we can
-- interleave two streams such that the value of second stream is always taken
-- from its last value even if no new value is being yielded, like
-- zipWithLatest. It would be something like interleaveWithLatest.
--
-- | Interleaves the outputs of two streams, yielding elements from each stream
-- alternately, starting from the first stream. If any of the streams finishes
-- early the other stream continues alone until it too finishes.
--
-- >>> :set -XOverloadedStrings
-- >>> interleave "ab" ",,,," :: SerialT Identity Char
-- fromList "a,b,,,"
-- >>> interleave "abcd" ",," :: SerialT Identity Char
-- fromList "a,b,cd"
--
-- 'interleave' is dual to 'interleaveMin', it can be called @interleaveMax@.
--
-- Do not use at scale in concatMapWith.
--
-- @since 0.7.0
@ -2160,18 +2192,63 @@ interleave m1 m2 = fromStreamD $ D.interleave (toStreamD m1) (toStreamD m2)
-- | Interleaves the outputs of two streams, yielding elements from each stream
-- alternately, starting from the first stream. As soon as the first stream
-- finishes the output stops discarding the second stream.
-- finishes, the output stops, discarding the remaining part of the second
-- stream. In this case, the last element in the resulting stream would be from
-- the second stream. If the second stream finishes early then the first stream
-- still continues to yield elements until it finishes.
--
-- >>> :set -XOverloadedStrings
-- >>> interleaveSuffix "abc" ",,,," :: SerialT Identity Char
-- fromList "a,b,c,"
-- >>> interleaveSuffix "abc" "," :: SerialT Identity Char
-- fromList "a,bc"
--
-- 'interleaveSuffix' is a dual of 'interleaveInfix'.
--
-- Do not use at scale in concatMapWith.
--
-- @since 0.7.0
{-# INLINE interleaveFst #-}
interleaveFst ::(IsStream t, Monad m) => t m b -> t m b -> t m b
interleaveFst m1 m2 = fromStreamD $ D.interleaveFst (toStreamD m1) (toStreamD m2)
{-# INLINE interleaveSuffix #-}
interleaveSuffix ::(IsStream t, Monad m) => t m b -> t m b -> t m b
interleaveSuffix m1 m2 =
fromStreamD $ D.interleaveSuffix (toStreamD m1) (toStreamD m2)
-- | Interleaves the outputs of two streams, yielding elements from each stream
-- alternately, starting from the first stream and ending at the first stream.
-- If the second stream is longer than the first, elements from the second
-- stream are infixed with elements from the first stream. If the first stream
-- is longer then it continues yielding elements even after the second stream
-- has finished.
--
-- >>> :set -XOverloadedStrings
-- >>> interleaveInfix "abc" ",,,," :: SerialT Identity Char
-- fromList "a,b,c"
-- >>> interleaveInfix "abc" "," :: SerialT Identity Char
-- fromList "a,bc"
--
-- 'interleaveInfix' is a dual of 'interleaveSuffix'.
--
-- Do not use at scale in concatMapWith.
--
-- @since 0.7.0
{-# INLINE interleaveInfix #-}
interleaveInfix ::(IsStream t, Monad m) => t m b -> t m b -> t m b
interleaveInfix m1 m2 =
fromStreamD $ D.interleaveInfix (toStreamD m1) (toStreamD m2)
-- | Interleaves the outputs of two streams, yielding elements from each stream
-- alternately, starting from the first stream. The output stops as soon as any
-- of the two streams finishes discarding the other.
-- of the two streams finishes, discarding the remaining part of the other
-- stream. The last element of the resulting stream would be from the longer
-- stream.
--
-- >>> :set -XOverloadedStrings
-- >>> interleaveMin "ab" ",,,," :: SerialT Identity Char
-- fromList "a,b,"
-- >>> interleaveMin "abcd" ",," :: SerialT Identity Char
-- fromList "a,b,c"
--
-- 'interleaveMin' is dual to 'interleave'.
--
-- Do not use at scale in concatMapWith.
--
@ -2235,29 +2312,103 @@ concatUnfoldRoundrobin ::(IsStream t, Monad m)
concatUnfoldRoundrobin u m =
fromStreamD $ D.concatUnfoldRoundrobin u (toStreamD m)
{-
-- | Insert a stream between segements of streams and flatten.
intercalate :: (IsStream t, MonadAsync m)
=> SerialT Identity b -> (a -> t m b) -> t m a -> t m b
intercalate = undefined
-}
-- XXX we can swap the order of arguments to gintercalate so that the
-- definition of concatUnfold becomes simpler? The first stream should be
-- infixed inside the second one. However, if we change the order in
-- "interleave" as well similarly, then that will make it a bit unintuitive.
--
-- > concatUnfold unf str =
-- > gintercalate unf str (UF.nilM (\_ -> return ())) (repeat ())
--
-- | 'interleaveInfix' followed by unfold and concat.
--
-- /Internal/
{-# INLINE gintercalate #-}
gintercalate
:: (IsStream t, Monad m)
=> Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c
gintercalate unf1 str1 unf2 str2 =
D.fromStreamD $ D.gintercalate
unf1 (D.toStreamD str1)
unf2 (D.toStreamD str2)
-- | @intercalateSuffix genSuffix seed gen stream@ generates streams from seeds
-- in the input @stream@ using the generator @gen@ and concatenates them
-- after suffixing the stream generated by @genSuffix@ using @seed@.
-- XXX The order of arguments in "intercalate" is consistent with the list
-- intercalate but inconsistent with gintercalate and other stream interleaving
-- combinators. We can change the order of the arguments in other combinators
-- but then 'interleave' combinator may become a bit unintuitive because we
-- will be starting with the second stream.
-- > intercalate seed unf str = gintercalate unf str unf (repeatM seed)
-- > intercalate a unf str = concatUnfold unf $ intersperse a str
--
-- For example to insert CRLF in a stream of character strings:
-- | 'intersperse' followed by unfold and concat.
--
-- > unlines = intercalateSuffix UF.fromList "\r\n" UF.fromList
-- > unwords = intercalate " " UF.fromList
--
-- >>> intercalate " " UF.fromList ["abc", "def", "ghi"]
-- > "abc def ghi"
--
{-# INLINE intercalate #-}
intercalate :: (IsStream t, Monad m)
=> b -> Unfold m b c -> t m b -> t m c
intercalate seed unf str = D.fromStreamD $
D.concatMapU unf $ D.intersperse seed (toStreamD str)
-- > interpose x unf str = gintercalate unf str UF.identity (repeat x)
--
-- | Unfold the elements of a stream, intersperse the given element between the
-- unfolded streams and then concat them into a single stream.
--
-- > unwords = S.interpose ' '
--
-- /Internal/
{-# INLINE interpose #-}
interpose :: (IsStream t, Monad m)
=> c -> Unfold m b c -> t m b -> t m c
interpose x unf str =
D.fromStreamD $ D.interpose (return x) unf (D.toStreamD str)
-- | 'interleaveSuffix' followed by unfold and concat.
--
-- /Internal/
{-# INLINE gintercalateSuffix #-}
gintercalateSuffix
:: (IsStream t, Monad m)
=> Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c
gintercalateSuffix unf1 str1 unf2 str2 =
D.fromStreamD $ D.gintercalateSuffix
unf1 (D.toStreamD str1)
unf2 (D.toStreamD str2)
-- > intercalateSuffix seed unf str = gintercalateSuffix unf str unf (repeatM seed)
-- > intercalateSuffix a unf str = concatUnfold unf $ intersperseSuffix a str
--
-- | 'intersperseSuffix' followed by unfold and concat.
--
-- > unlines = intercalateSuffix "\n" UF.fromList
--
-- >>> intercalate "\n" UF.fromList ["abc", "def", "ghi"]
-- > "abc\ndef\nghi\n"
--
{-# INLINE intercalateSuffix #-}
intercalateSuffix
:: (IsStream t, Monad m)
=> Unfold m a c -> a -> Unfold m b c -> t m b -> t m c
intercalateSuffix suffix seed unf str =
D.fromStreamD $ D.interleaveFstThenConcat
unf (D.toStreamD str)
suffix (D.repeat seed)
intercalateSuffix :: (IsStream t, Monad m)
=> b -> Unfold m b c -> t m b -> t m c
intercalateSuffix seed unf str = fromStreamD $ D.concatMapU unf
$ D.intersperseSuffix (return seed) (D.toStreamD str)
-- interposeSuffix x unf str = gintercalateSuffix unf str UF.identity (repeat x)
--
-- | Unfold the elements of a stream, append the given element after each
-- unfolded stream and then concat them into a single stream.
--
-- > unlines = S.interposeSuffix '\n'
--
-- /Internal/
{-# INLINE interposeSuffix #-}
interposeSuffix :: (IsStream t, Monad m)
=> c -> Unfold m b c -> t m b -> t m c
interposeSuffix x unf str =
D.fromStreamD $ D.interposeSuffix (return x) unf (D.toStreamD str)
------------------------------------------------------------------------------
-- Grouping/Splitting
@ -2407,7 +2558,7 @@ intervalsOf
=> Double -> Fold m a b -> t m a -> t m b
intervalsOf n f xs =
splitWithSuffix isNothing (FL.lcatMaybes f)
(interject n (return Nothing) (Serial.map Just xs))
(interjectSuffix n (return Nothing) (Serial.map Just xs))
------------------------------------------------------------------------------
-- Element Aware APIs

View File

@ -65,6 +65,7 @@ module Streamly.Streams.StreamD
-- ** Specialized Generation
-- | Generate a monadic stream from a seed.
, repeat
, repeatM
, replicate
, replicateM
, fromIndices
@ -148,9 +149,13 @@ module Streamly.Streams.StreamD
, InterleaveState(..)
, interleave
, interleaveMin
, interleaveFst
, interleaveSuffix
, interleaveInfix
, roundRobin -- interleaveFair?/ParallelFair
, interleaveFstThenConcat
, gintercalateSuffix
, interposeSuffix
, gintercalate
, interpose
-- ** Grouping
, groupsOf
@ -229,7 +234,7 @@ module Streamly.Streams.StreamD
-- * Inserting
, intersperseM
, intersperse
, insertAfterEach
, intersperseSuffix
, insertBy
-- * Deleting
@ -419,6 +424,9 @@ unfold (Unfold ustep inject) seed = Stream step Nothing
-- Specialized Generation
------------------------------------------------------------------------------
repeatM :: Monad m => m a -> Stream m a
repeatM x = Stream (\_ _ -> x >>= \r -> return $ Yield r ()) ()
repeat :: Monad m => a -> Stream m a
repeat x = Stream (\_ _ -> return $ Yield x ()) ()
@ -2103,9 +2111,9 @@ interleaveMin (Stream step1 state1) (Stream step2 state2) =
step _ (InterleaveFirstOnly _) = undefined
step _ (InterleaveSecondOnly _) = undefined
{-# INLINE_NORMAL interleaveFst #-}
interleaveFst :: Monad m => Stream m a -> Stream m a -> Stream m a
interleaveFst (Stream step1 state1) (Stream step2 state2) =
{-# INLINE_NORMAL interleaveSuffix #-}
interleaveSuffix :: Monad m => Stream m a -> Stream m a -> Stream m a
interleaveSuffix (Stream step1 state1) (Stream step2 state2) =
Stream step (InterleaveFirst state1 state2)
where
@ -2134,6 +2142,52 @@ interleaveFst (Stream step1 state1) (Stream step2 state2) =
step _ (InterleaveSecondOnly _) = undefined
data InterleaveInfixState s1 s2 a
= InterleaveInfixFirst s1 s2
| InterleaveInfixSecondBuf s1 s2
| InterleaveInfixSecondYield s1 s2 a
| InterleaveInfixFirstYield s1 s2 a
| InterleaveInfixFirstOnly s1
{-# INLINE_NORMAL interleaveInfix #-}
interleaveInfix :: Monad m => Stream m a -> Stream m a -> Stream m a
interleaveInfix (Stream step1 state1) (Stream step2 state2) =
Stream step (InterleaveInfixFirst state1 state2)
where
{-# INLINE_LATE step #-}
step gst (InterleaveInfixFirst st1 st2) = do
r <- step1 gst st1
return $ case r of
Yield a s -> Yield a (InterleaveInfixSecondBuf s st2)
Skip s -> Skip (InterleaveInfixFirst s st2)
Stop -> Stop
step gst (InterleaveInfixSecondBuf st1 st2) = do
r <- step2 gst st2
return $ case r of
Yield a s -> Skip (InterleaveInfixSecondYield st1 s a)
Skip s -> Skip (InterleaveInfixSecondBuf st1 s)
Stop -> Skip (InterleaveInfixFirstOnly st1)
step gst (InterleaveInfixSecondYield st1 st2 x) = do
r <- step1 gst st1
return $ case r of
Yield a s -> Yield x (InterleaveInfixFirstYield s st2 a)
Skip s -> Skip (InterleaveInfixSecondYield s st2 x)
Stop -> Stop
step _ (InterleaveInfixFirstYield st1 st2 x) = do
return $ Yield x (InterleaveInfixSecondBuf st1 st2)
step gst (InterleaveInfixFirstOnly st1) = do
r <- step1 gst st1
return $ case r of
Yield a s -> Yield a (InterleaveInfixFirstOnly s)
Skip s -> Skip (InterleaveInfixFirstOnly s)
Stop -> Stop
{-# INLINE_NORMAL roundRobin #-}
roundRobin :: Monad m => Stream m a -> Stream m a -> Stream m a
roundRobin (Stream step1 state1) (Stream step2 state2) =
@ -2190,11 +2244,11 @@ data ICUState s1 s2 i1 i2 =
-- => [streamA1, streamB1, streamA2...StreamAn, streamBn]
-- => [a11, a12, ...a1j, b11, b12, ...b1k, a21, a22, ...]
--
{-# INLINE_NORMAL interleaveFstThenConcat #-}
interleaveFstThenConcat
{-# INLINE_NORMAL gintercalateSuffix #-}
gintercalateSuffix
:: Monad m
=> Unfold m a c -> Stream m a -> Unfold m b c -> Stream m b -> Stream m c
interleaveFstThenConcat
gintercalateSuffix
(Unfold istep1 inject1) (Stream step1 state1)
(Unfold istep2 inject2) (Stream step2 state2) =
Stream step (ICUFirst state1 state2)
@ -2253,6 +2307,269 @@ interleaveFstThenConcat
step _ (ICUSecondOnly _s2) = undefined
step _ (ICUSecondOnlyInner _s2 _i2) = undefined
data InterposeSuffixState s1 i1 =
InterposeSuffixFirst s1
-- | InterposeSuffixFirstYield s1 i1
| InterposeSuffixFirstInner s1 i1
| InterposeSuffixSecond s1
-- Note that if an unfolded layer turns out to be nil we still emit the
-- separator effect. An alternate behavior could be to emit the separator
-- effect only if at least one element has been yielded by the unfolding.
-- However, that becomes a bit complicated, so we have chosen the former
-- behvaior for now.
{-# INLINE_NORMAL interposeSuffix #-}
interposeSuffix
:: Monad m
=> m c -> Unfold m b c -> Stream m b -> Stream m c
interposeSuffix
action
(Unfold istep1 inject1) (Stream step1 state1) =
Stream step (InterposeSuffixFirst state1)
where
{-# INLINE_LATE step #-}
step gst (InterposeSuffixFirst s1) = do
r <- step1 (adaptState gst) s1
case r of
Yield a s -> do
i <- inject1 a
i `seq` return (Skip (InterposeSuffixFirstInner s i))
-- i `seq` return (Skip (InterposeSuffixFirstYield s i))
Skip s -> return $ Skip (InterposeSuffixFirst s)
Stop -> return Stop
{-
step _ (InterposeSuffixFirstYield s1 i1) = do
r <- istep1 i1
return $ case r of
Yield x i' -> Yield x (InterposeSuffixFirstInner s1 i')
Skip i' -> Skip (InterposeSuffixFirstYield s1 i')
Stop -> Skip (InterposeSuffixFirst s1)
-}
step _ (InterposeSuffixFirstInner s1 i1) = do
r <- istep1 i1
return $ case r of
Yield x i' -> Yield x (InterposeSuffixFirstInner s1 i')
Skip i' -> Skip (InterposeSuffixFirstInner s1 i')
Stop -> Skip (InterposeSuffixSecond s1)
step _ (InterposeSuffixSecond s1) = do
r <- action
return $ Yield r (InterposeSuffixFirst s1)
data ICALState s1 s2 i1 i2 a =
ICALFirst s1 s2
-- | ICALFirstYield s1 s2 i1
| ICALFirstInner s1 s2 i1
| ICALFirstOnly s1
| ICALFirstOnlyInner s1 i1
| ICALSecondInject s1 s2
| ICALFirstInject s1 s2 i2
-- | ICALFirstBuf s1 s2 i1 i2
| ICALSecondInner s1 s2 i1 i2
-- -- | ICALSecondInner s1 s2 i1 i2 a
-- -- | ICALFirstResume s1 s2 i1 i2 a
-- | Interleave streams (full streams, not the elements) unfolded from two
-- input streams and concat. Stop when the first stream stops. If the second
-- stream ends before the first one then first stream still keeps running alone
-- without any interleaving with the second stream.
--
-- [a1, a2, ... an] [b1, b2 ...]
-- => [streamA1, streamA2, ... streamAn] [streamB1, streamB2, ...]
-- => [streamA1, streamB1, streamA2...StreamAn, streamBn]
-- => [a11, a12, ...a1j, b11, b12, ...b1k, a21, a22, ...]
--
{-# INLINE_NORMAL gintercalate #-}
gintercalate
:: Monad m
=> Unfold m a c -> Stream m a -> Unfold m b c -> Stream m b -> Stream m c
gintercalate
(Unfold istep1 inject1) (Stream step1 state1)
(Unfold istep2 inject2) (Stream step2 state2) =
Stream step (ICALFirst state1 state2)
where
{-# INLINE_LATE step #-}
step gst (ICALFirst s1 s2) = do
r <- step1 (adaptState gst) s1
case r of
Yield a s -> do
i <- inject1 a
i `seq` return (Skip (ICALFirstInner s s2 i))
-- i `seq` return (Skip (ICALFirstYield s s2 i))
Skip s -> return $ Skip (ICALFirst s s2)
Stop -> return Stop
{-
step _ (ICALFirstYield s1 s2 i1) = do
r <- istep1 i1
return $ case r of
Yield x i' -> Yield x (ICALFirstInner s1 s2 i')
Skip i' -> Skip (ICALFirstYield s1 s2 i')
Stop -> Skip (ICALFirst s1 s2)
-}
step _ (ICALFirstInner s1 s2 i1) = do
r <- istep1 i1
return $ case r of
Yield x i' -> Yield x (ICALFirstInner s1 s2 i')
Skip i' -> Skip (ICALFirstInner s1 s2 i')
Stop -> Skip (ICALSecondInject s1 s2)
step gst (ICALFirstOnly s1) = do
r <- step1 (adaptState gst) s1
case r of
Yield a s -> do
i <- inject1 a
i `seq` return (Skip (ICALFirstOnlyInner s i))
Skip s -> return $ Skip (ICALFirstOnly s)
Stop -> return Stop
step _ (ICALFirstOnlyInner s1 i1) = do
r <- istep1 i1
return $ case r of
Yield x i' -> Yield x (ICALFirstOnlyInner s1 i')
Skip i' -> Skip (ICALFirstOnlyInner s1 i')
Stop -> Skip (ICALFirstOnly s1)
-- We inject the second stream even before checking if the first stream
-- would yield any more elements. There is no clear choice whether we
-- should do this before or after that. Doing it after may make the state
-- machine a bit simpler though.
step gst (ICALSecondInject s1 s2) = do
r <- step2 (adaptState gst) s2
case r of
Yield a s -> do
i <- inject2 a
i `seq` return (Skip (ICALFirstInject s1 s i))
Skip s -> return $ Skip (ICALSecondInject s1 s)
Stop -> return $ Skip (ICALFirstOnly s1)
step gst (ICALFirstInject s1 s2 i2) = do
r <- step1 (adaptState gst) s1
case r of
Yield a s -> do
i <- inject1 a
i `seq` return (Skip (ICALSecondInner s s2 i i2))
-- i `seq` return (Skip (ICALFirstBuf s s2 i i2))
Skip s -> return $ Skip (ICALFirstInject s s2 i2)
Stop -> return Stop
{-
step _ (ICALFirstBuf s1 s2 i1 i2) = do
r <- istep1 i1
return $ case r of
Yield x i' -> Skip (ICALSecondInner s1 s2 i' i2 x)
Skip i' -> Skip (ICALFirstBuf s1 s2 i' i2)
Stop -> Stop
step _ (ICALSecondInner s1 s2 i1 i2 v) = do
r <- istep2 i2
return $ case r of
Yield x i' -> Yield x (ICALSecondInner s1 s2 i1 i' v)
Skip i' -> Skip (ICALSecondInner s1 s2 i1 i' v)
Stop -> Skip (ICALFirstResume s1 s2 i1 i2 v)
-}
step _ (ICALSecondInner s1 s2 i1 i2) = do
r <- istep2 i2
return $ case r of
Yield x i' -> Yield x (ICALSecondInner s1 s2 i1 i')
Skip i' -> Skip (ICALSecondInner s1 s2 i1 i')
Stop -> Skip (ICALFirstInner s1 s2 i1)
-- Stop -> Skip (ICALFirstResume s1 s2 i1 i2)
{-
step _ (ICALFirstResume s1 s2 i1 i2 x) = do
return $ Yield x (ICALFirstInner s1 s2 i1 i2)
-}
data InterposeState s1 i1 a =
InterposeFirst s1
-- | InterposeFirstYield s1 i1
| InterposeFirstInner s1 i1
| InterposeFirstInject s1
-- | InterposeFirstBuf s1 i1
| InterposeSecondYield s1 i1
-- -- | InterposeSecondYield s1 i1 a
-- -- | InterposeFirstResume s1 i1 a
-- Note that this only interposes the pure values, we may run many effects to
-- generate those values as some effects may not generate anything (Skip).
{-# INLINE_NORMAL interpose #-}
interpose :: Monad m => m c -> Unfold m b c -> Stream m b -> Stream m c
interpose
action
(Unfold istep1 inject1) (Stream step1 state1) =
Stream step (InterposeFirst state1)
where
{-# INLINE_LATE step #-}
step gst (InterposeFirst s1) = do
r <- step1 (adaptState gst) s1
case r of
Yield a s -> do
i <- inject1 a
i `seq` return (Skip (InterposeFirstInner s i))
-- i `seq` return (Skip (InterposeFirstYield s i))
Skip s -> return $ Skip (InterposeFirst s)
Stop -> return Stop
{-
step _ (InterposeFirstYield s1 i1) = do
r <- istep1 i1
return $ case r of
Yield x i' -> Yield x (InterposeFirstInner s1 i')
Skip i' -> Skip (InterposeFirstYield s1 i')
Stop -> Skip (InterposeFirst s1)
-}
step _ (InterposeFirstInner s1 i1) = do
r <- istep1 i1
return $ case r of
Yield x i' -> Yield x (InterposeFirstInner s1 i')
Skip i' -> Skip (InterposeFirstInner s1 i')
Stop -> Skip (InterposeFirstInject s1)
step gst (InterposeFirstInject s1) = do
r <- step1 (adaptState gst) s1
case r of
Yield a s -> do
i <- inject1 a
-- i `seq` return (Skip (InterposeFirstBuf s i))
i `seq` return (Skip (InterposeSecondYield s i))
Skip s -> return $ Skip (InterposeFirstInject s)
Stop -> return Stop
{-
step _ (InterposeFirstBuf s1 i1) = do
r <- istep1 i1
return $ case r of
Yield x i' -> Skip (InterposeSecondYield s1 i' x)
Skip i' -> Skip (InterposeFirstBuf s1 i')
Stop -> Stop
-}
{-
step _ (InterposeSecondYield s1 i1 v) = do
r <- action
return $ Yield r (InterposeFirstResume s1 i1 v)
-}
step _ (InterposeSecondYield s1 i1) = do
r <- action
return $ Yield r (InterposeFirstInner s1 i1)
{-
step _ (InterposeFirstResume s1 i1 v) = do
return $ Yield v (InterposeFirstInner s1 i1)
-}
------------------------------------------------------------------------------
-- Exceptions
------------------------------------------------------------------------------
@ -2853,9 +3170,9 @@ data SuffixState s a
| SuffixSuffix s
| SuffixYield a (SuffixState s a)
{-# INLINE_NORMAL insertAfterEach #-}
insertAfterEach :: forall m a. Monad m => m a -> Stream m a -> Stream m a
insertAfterEach action (Stream step state) = Stream step' (SuffixElem state)
{-# INLINE_NORMAL intersperseSuffix #-}
intersperseSuffix :: forall m a. Monad m => m a -> Stream m a -> Stream m a
intersperseSuffix action (Stream step state) = Stream step' (SuffixElem state)
where
{-# INLINE_LATE step' #-}
step' gst (SuffixElem st) = do

View File

@ -62,6 +62,7 @@ module Streamly.Streams.StreamK
-- ** Specialized Generation
, repeat
, repeatM
, replicate
, replicateM
, fromIndices
@ -268,9 +269,16 @@ once = yieldM
-- repeatM = cycle1 . yield
-- @
--
-- Generate an infinite stream by repeating a monadic value.
--
-- /Internal/
repeatM :: (IsStream t, MonadAsync m) => m a -> t m a
repeatM = go
where go m = m |: go m
-- Generate an infinite stream by repeating a pure value.
--
-- @since 0.4.0
-- /Internal/
{-# INLINE repeat #-}
repeat :: IsStream t => a -> t m a
repeat a = let x = cons a x in x