From a9f3f9a0fb6c40903ed1560f2f619f3c27f1545c Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Tue, 15 Oct 2019 00:34:40 +0530 Subject: [PATCH] Add unfoldWords/interpose/intercalate etc. * Rearrange, update benchmarks for words/unwords, lines/unlines * Add interpose/intercalate/interleaveInfix/interleaveSuffix to Prelude * Add unfoldWords to Data.String Use the newly added rotuines to write words/unwords and lines/unlines more idiomatically and with improved performance. --- benchmark/FileIO.hs | 28 +- src/Streamly/Benchmark/FileIO/Array.hs | 11 +- src/Streamly/Benchmark/FileIO/Stream.hs | 72 ++++- src/Streamly/Data/String.hs | 20 +- src/Streamly/Internal/Data/Unfold.hs | 12 +- src/Streamly/Internal/Memory/ArrayStream.hs | 14 +- src/Streamly/Internal/Prelude.hs | 233 ++++++++++--- src/Streamly/Streams/StreamD.hs | 341 +++++++++++++++++++- src/Streamly/Streams/StreamK.hs | 10 +- 9 files changed, 644 insertions(+), 97 deletions(-) diff --git a/benchmark/FileIO.hs b/benchmark/FileIO.hs index 58773372a..7397e9bf1 100644 --- a/benchmark/FileIO.hs +++ b/benchmark/FileIO.hs @@ -201,20 +201,32 @@ main = do Handles inh _ <- readIORef href BFS.chunksOf 1000 inh ] - , bgroup "group-ungroup" - [ mkBench "lines-unlines" href $ do + , bgroup "group-ungroup-stream" + [ mkBench "lines-unlines-[Char]" href $ do Handles inh outh <- readIORef href BFS.linesUnlinesCopy inh outh - , mkBench "lines-unlines-arrays" href $ do + , mkBench "lines-unlines-Word8Array" href $ do Handles inh outh <- readIORef href - BFA.linesUnlinesCopy inh outh - , mkBench "words-unwords" href $ do + BFS.linesUnlinesArrayWord8Copy inh outh + , mkBench "lines-unlines-CharArray" href $ do Handles inh outh <- readIORef href - BFS.wordsUnwordsCopy inh outh - , mkBench "words-unwords-word8" href $ do + BFS.linesUnlinesArrayCharCopy inh outh + , mkBench "words-unwords-[Word8]" href $ do Handles inh outh <- readIORef href BFS.wordsUnwordsCopyWord8 inh outh - , mkBench "words-unwords-arrays" href $ do + , mkBench "words-unwords-[Char]" href $ do + Handles inh outh <- readIORef href + BFS.wordsUnwordsCopy inh outh + , mkBench "words-unwords-CharArray" href $ do + Handles inh outh <- readIORef href + BFS.wordsUnwordsCharArrayCopy inh outh + ] + + , bgroup "group-ungroup-array-stream" + [ mkBench "lines-unlines-Word8Array" href $ do + Handles inh outh <- readIORef href + BFA.linesUnlinesCopy inh outh + , mkBench "words-unwords-Word8Array" href $ do Handles inh outh <- readIORef href BFA.wordsUnwordsCopy inh outh ] diff --git a/src/Streamly/Benchmark/FileIO/Array.hs b/src/Streamly/Benchmark/FileIO/Array.hs index 1683ee9c1..71eced52a 100644 --- a/src/Streamly/Benchmark/FileIO/Array.hs +++ b/src/Streamly/Benchmark/FileIO/Array.hs @@ -48,7 +48,6 @@ import qualified Streamly.Prelude as S import qualified Streamly.Data.String as SS import qualified Streamly.Internal.FileSystem.Handle as IFH -import qualified Streamly.Internal.Prelude as Internal import qualified Streamly.Internal.Memory.Array as IA import qualified Streamly.Internal.Memory.ArrayStream as AS import qualified Streamly.Internal.Data.Unfold as IUF @@ -186,8 +185,8 @@ inspect $ 'copy `hasNoType` ''Step {-# INLINE linesUnlinesCopy #-} linesUnlinesCopy :: Handle -> Handle -> IO () linesUnlinesCopy inh outh = - S.fold (IFH.writeArraysInChunksOf (1024*1024) outh) - $ Internal.insertAfterEach (return $ A.fromList [10]) + S.fold (IFH.writeInChunksOf (1024*1024) outh) + $ AS.interposeSuffix 10 $ AS.splitOnSuffix 10 $ IFH.toStreamArraysOf (1024*1024) inh @@ -200,9 +199,9 @@ inspect $ hasNoTypeClassesExcept 'linesUnlinesCopy [''Storable] {-# INLINE wordsUnwordsCopy #-} wordsUnwordsCopy :: Handle -> Handle -> IO () wordsUnwordsCopy inh outh = - S.fold (IFH.writeArraysInChunksOf (1024*1024) outh) - $ S.intersperse (A.fromList [32]) - -- XXX use a word splitting combinator + S.fold (IFH.writeInChunksOf (1024*1024) outh) + $ AS.interpose 32 + -- XXX this is not correct word splitting combinator $ AS.splitOn 32 $ IFH.toStreamArraysOf (1024*1024) inh diff --git a/src/Streamly/Benchmark/FileIO/Stream.hs b/src/Streamly/Benchmark/FileIO/Stream.hs index 2d69114d4..db5b72c5c 100644 --- a/src/Streamly/Benchmark/FileIO/Stream.hs +++ b/src/Streamly/Benchmark/FileIO/Stream.hs @@ -40,8 +40,12 @@ module Streamly.Benchmark.FileIO.Stream , catFinallyStream , copy , linesUnlinesCopy + , linesUnlinesArrayWord8Copy + , linesUnlinesArrayCharCopy + -- , linesUnlinesArrayUtf8Copy , wordsUnwordsCopyWord8 , wordsUnwordsCopy + , wordsUnwordsCharArrayCopy , readWord8 , decodeChar8 , copyCodecChar8 @@ -68,9 +72,11 @@ import Prelude hiding (last, length) import qualified Streamly.FileSystem.Handle as FH import qualified Streamly.Internal.FileSystem.Handle as IFH import qualified Streamly.Memory.Array as A +-- import qualified Streamly.Internal.Memory.Array as IA import qualified Streamly.Internal.Memory.Array.Types as AT import qualified Streamly.Prelude as S import qualified Streamly.Data.Fold as FL +-- import qualified Streamly.Internal.Data.Fold as IFL import qualified Streamly.Data.String as SS import qualified Streamly.Internal.Data.Unfold as IUF import qualified Streamly.Internal.Prelude as IP @@ -400,11 +406,29 @@ inspect $ 'chunksOfD `hasNoType` ''AT.FlattenState inspect $ 'chunksOfD `hasNoType` ''D.ConcatMapUState #endif --- XXX splitSuffixOn requires -funfolding-use-threshold=150 for better fusion --- | Lines and unlines {-# INLINE linesUnlinesCopy #-} linesUnlinesCopy :: Handle -> Handle -> IO () linesUnlinesCopy inh outh = + S.fold (FH.write outh) + $ SS.encodeChar8 + $ SS.unfoldLines IUF.fromList + $ S.splitOnSuffix (== '\n') FL.toList + $ SS.decodeChar8 + $ S.unfold FH.read inh + +{-# INLINE linesUnlinesArrayWord8Copy #-} +linesUnlinesArrayWord8Copy :: Handle -> Handle -> IO () +linesUnlinesArrayWord8Copy inh outh = + S.fold (FH.write outh) + $ IP.interposeSuffix 10 A.read + $ S.splitOnSuffix (== 10) A.write + $ S.unfold FH.read inh + +-- XXX splitSuffixOn requires -funfolding-use-threshold=150 for better fusion +-- | Lines and unlines +{-# INLINE linesUnlinesArrayCharCopy #-} +linesUnlinesArrayCharCopy :: Handle -> Handle -> IO () +linesUnlinesArrayCharCopy inh outh = S.fold (FH.write outh) $ SS.encodeChar8 $ SS.unlines @@ -413,12 +437,26 @@ linesUnlinesCopy inh outh = $ S.unfold FH.read inh #ifdef INSPECTION -inspect $ hasNoTypeClassesExcept 'linesUnlinesCopy [''Storable] --- inspect $ 'linesUnlinesCopy `hasNoType` ''Step --- inspect $ 'linesUnlinesCopy `hasNoType` ''AT.FlattenState --- inspect $ 'linesUnlinesCopy `hasNoType` ''D.ConcatMapUState +inspect $ hasNoTypeClassesExcept 'linesUnlinesArrayCharCopy [''Storable] +-- inspect $ 'linesUnlinesArrayCharCopy `hasNoType` ''Step +-- inspect $ 'linesUnlinesArrayCharCopy `hasNoType` ''AT.FlattenState +-- inspect $ 'linesUnlinesArrayCharCopy `hasNoType` ''D.ConcatMapUState #endif +-- XXX to write this we need to be able to map decodeUtf8 on the A.read fold. +-- For that we have to write decodeUtf8 as a Pipe. +{- +{-# INLINE linesUnlinesArrayUtf8Copy #-} +linesUnlinesArrayUtf8Copy :: Handle -> Handle -> IO () +linesUnlinesArrayUtf8Copy inh outh = + S.fold (FH.write outh) + $ SS.encodeChar8 + $ IP.intercalate (A.fromList [10]) (pipe SS.decodeUtf8P A.read) + $ S.splitOnSuffix (== '\n') (IFL.lmap SS.encodeUtf8 A.write) + $ SS.decodeChar8 + $ S.unfold FH.read inh +-} + foreign import ccall unsafe "u_iswspace" iswspace :: Int -> Int @@ -446,8 +484,7 @@ isSp = isSpace . chr . fromIntegral wordsUnwordsCopyWord8 :: Handle -> Handle -> IO () wordsUnwordsCopyWord8 inh outh = S.fold (FH.write outh) - $ S.concatUnfold IUF.fromList - $ S.intersperse [32] + $ IP.interposeSuffix 32 IUF.fromList $ S.wordsBy isSp FL.toList $ S.unfold FH.read inh @@ -463,14 +500,7 @@ wordsUnwordsCopy :: Handle -> Handle -> IO () wordsUnwordsCopy inh outh = S.fold (FH.write outh) $ SS.encodeChar8 - $ S.concatUnfold IUF.fromList - $ S.intersperse " " - -- Array allocation is too expensive for such small strings. So just use - -- lists instead. - -- - -- -- $ SS.unwords - -- -- $ SS.words - -- + $ SS.unfoldWords IUF.fromList -- XXX This pipeline does not fuse with wordsBy but fuses with splitOn -- with -funfolding-use-threshold=300. With wordsBy it does not fuse -- even with high limits for inlining and spec-constr ghc options. With @@ -490,6 +520,16 @@ wordsUnwordsCopy inh outh = -- inspect $ 'wordsUnwordsCopy `hasNoType` ''D.ConcatMapUState #endif +{-# INLINE wordsUnwordsCharArrayCopy #-} +wordsUnwordsCharArrayCopy :: Handle -> Handle -> IO () +wordsUnwordsCharArrayCopy inh outh = + S.fold (FH.write outh) + $ SS.encodeChar8 + $ SS.unwords + $ SS.words + $ SS.decodeChar8 + $ S.unfold FH.read inh + lf :: Word8 lf = fromIntegral (ord '\n') diff --git a/src/Streamly/Data/String.hs b/src/Streamly/Data/String.hs index b41c67085..17ec6adcd 100644 --- a/src/Streamly/Data/String.hs +++ b/src/Streamly/Data/String.hs @@ -74,6 +74,7 @@ module Streamly.Data.String , foldLines , foldWords , unfoldLines + , unfoldWords -- * Streams of Strings , lines @@ -95,9 +96,7 @@ import Streamly.Internal.Data.Unfold (Unfold) import qualified Streamly.Internal.Prelude as S import qualified Streamly.Memory.Array as A -import qualified Streamly.Internal.Memory.ArrayStream as AS import qualified Streamly.Streams.StreamD as D -import qualified Streamly.Internal.Data.Unfold as UF -- type String = List Char @@ -269,11 +268,9 @@ words = foldWords A.write -- | Unfold a stream to character streams using the supplied 'Unfold' -- and concat the results suffixing a newline character @\\n@ to each stream. -- --- > unfoldLines = S.intercalateSuffix UF.identity '\n' --- {-# INLINE unfoldLines #-} unfoldLines :: (MonadIO m, IsStream t) => Unfold m a Char -> t m a -> t m Char -unfoldLines unf = S.intercalateSuffix UF.identity '\n' unf +unfoldLines = S.interposeSuffix '\n' -- | Flattens the stream of @Array Char@, after appending a terminating -- newline to each string. @@ -292,6 +289,14 @@ unfoldLines unf = S.intercalateSuffix UF.identity '\n' unf unlines :: (MonadIO m, IsStream t) => t m (Array Char) -> t m Char unlines = unfoldLines A.read +-- | Unfold the elements of a stream to character streams using the supplied +-- 'Unfold' and concat the results with a whitespace character infixed between +-- the streams. +-- +{-# INLINE unfoldWords #-} +unfoldWords :: (MonadIO m, IsStream t) => Unfold m a Char -> t m a -> t m Char +unfoldWords = S.interpose ' ' + -- | Flattens the stream of @Array Char@, after appending a separating -- space to each string. -- @@ -300,12 +305,11 @@ unlines = unfoldLines A.read -- >>> S.toList $ unwords $ S.fromList ["unwords", "this", "string"] -- "unwords this string" -- --- --- > unwords = A.concat . (S.intersperse (A.fromList " ")) +-- > unwords = unfoldWords A.read -- -- Note that, in general -- -- > unwords . words /= id {-# INLINE unwords #-} unwords :: (MonadAsync m, IsStream t) => t m (Array Char) -> t m Char -unwords = AS.concat . (S.intersperse (A.fromList " ")) +unwords = unfoldWords A.read diff --git a/src/Streamly/Internal/Data/Unfold.hs b/src/Streamly/Internal/Data/Unfold.hs index d32ef06e1..b71d2b187 100644 --- a/src/Streamly/Internal/Data/Unfold.hs +++ b/src/Streamly/Internal/Data/Unfold.hs @@ -83,6 +83,7 @@ module Streamly.Internal.Data.Unfold , effect , singleton , identity + , const , replicateM , fromList , fromListM @@ -119,7 +120,7 @@ where import Control.Exception (Exception) import Data.Void (Void) import GHC.Types (SPEC(..)) -import Prelude hiding (concat, map, takeWhile, take, filter) +import Prelude hiding (concat, map, takeWhile, take, filter, const) import Streamly.Streams.StreamD.Type (Stream(..), Step(..)) #if __GLASGOW_HASKELL__ < 800 @@ -130,6 +131,7 @@ import Streamly.Internal.Data.Fold.Types (Fold(..)) import Streamly.Internal.Data.SVar (defState) import Control.Monad.Catch (MonadCatch) +import qualified Prelude import qualified Control.Monad.Catch as MC import qualified Data.Tuple as Tuple import qualified Streamly.Streams.StreamK as K @@ -152,7 +154,7 @@ lmap f (Unfold ustep uinject) = Unfold ustep (uinject . f) -- {-# INLINE_NORMAL supply #-} supply :: Unfold m a b -> a -> Unfold m Void b -supply unf a = lmap (const a) unf +supply unf a = lmap (Prelude.const a) unf -- | Supply the first component of the tuple to an unfold that accepts a tuple -- as a seed resulting in a fold that accepts the second component of the tuple @@ -344,6 +346,12 @@ singleton f = Unfold step inject identity :: Monad m => Unfold m a a identity = singleton return +const :: Monad m => m b -> Unfold m a b +const m = Unfold step inject + where + inject _ = return () + step () = m >>= \r -> return $ Yield r () + -- | Generates a stream replicating the seed @n@ times. -- {-# INLINE replicateM #-} diff --git a/src/Streamly/Internal/Memory/ArrayStream.hs b/src/Streamly/Internal/Memory/ArrayStream.hs index cf187d913..eae8f95c9 100644 --- a/src/Streamly/Internal/Memory/ArrayStream.hs +++ b/src/Streamly/Internal/Memory/ArrayStream.hs @@ -26,6 +26,7 @@ module Streamly.Internal.Memory.ArrayStream -- * Flattening to elements , concat , concatRev + , interpose , interposeSuffix , intercalateSuffix @@ -51,7 +52,6 @@ import Streamly.Internal.Memory.Array.Types (Array(..), length) import Streamly.Streams.Serial (SerialT) import Streamly.Streams.StreamK.Type (IsStream) -import qualified Streamly.Internal.Data.Unfold as UF import qualified Streamly.Internal.Memory.Array as A import qualified Streamly.Internal.Memory.Array.Types as A import qualified Streamly.Internal.Prelude as S @@ -86,10 +86,18 @@ concat m = D.fromStreamD $ D.concatMapU A.read (D.toStreamD m) concatRev :: (IsStream t, MonadIO m, Storable a) => t m (Array a) -> t m a concatRev m = D.fromStreamD $ A.flattenArraysRev (D.toStreamD m) +-- | Flatten a stream of arrays after inserting the given element between +-- arrays. +-- +-- /Internal/ +{-# INLINE interpose #-} +interpose :: (MonadIO m, IsStream t, Storable a) => a -> t m (Array a) -> t m a +interpose x = S.interpose x A.read + {-# INLINE intercalateSuffix #-} intercalateSuffix :: (MonadIO m, IsStream t, Storable a) => Array a -> t m (Array a) -> t m a -intercalateSuffix arr = S.intercalateSuffix A.read arr A.read +intercalateSuffix arr = S.intercalateSuffix arr A.read -- | Flatten a stream of arrays appending the given element after each -- array. @@ -99,7 +107,7 @@ intercalateSuffix arr = S.intercalateSuffix A.read arr A.read interposeSuffix :: (MonadIO m, IsStream t, Storable a) => a -> t m (Array a) -> t m a -- interposeSuffix x = D.fromStreamD . A.unlines x . D.toStreamD -interposeSuffix x = S.intercalateSuffix UF.identity x A.read +interposeSuffix x = S.interposeSuffix x A.read -- | Split a stream of arrays on a given separator byte, dropping the separator -- and coalescing all the arrays between two separators into a single array. diff --git a/src/Streamly/Internal/Prelude.hs b/src/Streamly/Internal/Prelude.hs index c217bb82c..c731e3502 100644 --- a/src/Streamly/Internal/Prelude.hs +++ b/src/Streamly/Internal/Prelude.hs @@ -33,7 +33,7 @@ module Streamly.Internal.Prelude -- ** From Values , yield , yieldM - , K.repeat + , repeat , repeatM , replicate , replicateM @@ -205,9 +205,9 @@ module Streamly.Internal.Prelude , insertBy , intersperseM , intersperse - , insertAfterEach + , intersperseSuffix -- , intersperseBySpan - , interject + , interjectSuffix -- ** Reordering , reverse @@ -220,8 +220,9 @@ module Streamly.Internal.Prelude -- ** Interleaving , interleave - , interleaveFst , interleaveMin + , interleaveSuffix + , interleaveInfix , Serial.wSerialFst , Serial.wSerialMin @@ -254,8 +255,12 @@ module Streamly.Internal.Prelude , concatUnfoldRoundrobin , concatMap , concatMapWith - -- , intercalate + , gintercalate + , gintercalateSuffix + , intercalate , intercalateSuffix + , interpose + , interposeSuffix -- -- ** Breaking @@ -400,7 +405,8 @@ import Prelude foldl, map, mapM, mapM_, sequence, all, any, sum, product, elem, notElem, maximum, minimum, head, last, tail, length, null, reverse, iterate, init, and, or, lookup, foldr1, (!!), - scanl, scanl1, replicate, concatMap, span, splitAt, break) + scanl, scanl1, replicate, concatMap, span, splitAt, break, + repeat) import qualified Data.Heap as H import qualified Data.Map.Strict as Map @@ -671,6 +677,14 @@ replicateMSerial n = fromStreamS . S.replicateM n replicate :: (IsStream t, Monad m) => Int -> a -> t m a replicate n = fromStreamS . S.replicate n +-- | +-- Generate an infinite stream by repeating a pure value. +-- +-- @since 0.4.0 +{-# INLINE_NORMAL repeat #-} +repeat :: (IsStream t, Monad m) => a -> t m a +repeat = fromStreamS . S.repeat + -- | -- @ -- repeatM = fix . consM @@ -687,9 +701,14 @@ replicate n = fromStreamS . S.replicate n -- /Concurrent, infinite (do not use with 'parallely')/ -- -- @since 0.2.0 +{-# INLINE_EARLY repeatM #-} repeatM :: (IsStream t, MonadAsync m) => m a -> t m a -repeatM = go - where go m = m |: go m +repeatM = K.repeatM + +{-# RULES "repeatM serial" repeatM = repeatMSerial #-} +{-# INLINE repeatMSerial #-} +repeatMSerial :: MonadAsync m => m a -> SerialT m a +repeatMSerial = fromStreamS . S.repeatM -- | -- @ @@ -1831,9 +1850,9 @@ intersperse a = fromStreamS . S.intersperse a . toStreamS -- | Insert a monadic action after each element in the stream. -- -- @since 0.7.0 -{-# INLINE insertAfterEach #-} -insertAfterEach :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a -insertAfterEach m = fromStreamD . D.insertAfterEach m . toStreamD +{-# INLINE intersperseSuffix #-} +intersperseSuffix :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a +intersperseSuffix m = fromStreamD . D.intersperseSuffix m . toStreamD {- -- | Intersperse a monadic action into the input stream after every @n@ @@ -1854,16 +1873,16 @@ intersperseBySpan _n _f _xs = undefined -- seconds. -- -- @ --- > S.drain $ S.interject 1 (putChar ',') $ S.mapM (\\x -> threadDelay 1000000 >> putChar x) $ S.fromList "hello" +-- > S.drain $ S.interjectSuffix 1 (putChar ',') $ S.mapM (\\x -> threadDelay 1000000 >> putChar x) $ S.fromList "hello" -- "h,e,l,l,o" -- @ -- -- @since 0.7.0 -{-# INLINE interject #-} -interject +{-# INLINE interjectSuffix #-} +interjectSuffix :: (IsStream t, MonadAsync m) => Double -> m a -> t m a -> t m a -interject n f xs = xs `Par.parallelFst` repeatM timed +interjectSuffix n f xs = xs `Par.parallelFst` repeatM timed where timed = liftIO (threadDelay (round $ n * 1000000)) >> f -- | @insertBy cmp elem stream@ inserts @elem@ before the first element in @@ -2144,13 +2163,26 @@ concatMap f m = fromStreamD $ D.concatMap (toStreamD . f) (toStreamD m) append ::(IsStream t, Monad m) => t m b -> t m b -> t m b append m1 m2 = fromStreamD $ D.append (toStreamD m1) (toStreamD m2) --- Same as 'wSerial'. We should perhaps rename wSerial. If named explicitly --- this would be interleaveMax. +-- XXX Same as 'wSerial'. We should perhaps rename wSerial to interleave. +-- XXX Document the interleaving behavior of side effects in all the +-- interleaving combinators. +-- XXX Write time-domain equivalents of these. In the time domain we can +-- interleave two streams such that the value of second stream is always taken +-- from its last value even if no new value is being yielded, like +-- zipWithLatest. It would be something like interleaveWithLatest. -- -- | Interleaves the outputs of two streams, yielding elements from each stream -- alternately, starting from the first stream. If any of the streams finishes -- early the other stream continues alone until it too finishes. -- +-- >>> :set -XOverloadedStrings +-- >>> interleave "ab" ",,,," :: SerialT Identity Char +-- fromList "a,b,,," +-- >>> interleave "abcd" ",," :: SerialT Identity Char +-- fromList "a,b,cd" +-- +-- 'interleave' is dual to 'interleaveMin', it can be called @interleaveMax@. +-- -- Do not use at scale in concatMapWith. -- -- @since 0.7.0 @@ -2160,18 +2192,63 @@ interleave m1 m2 = fromStreamD $ D.interleave (toStreamD m1) (toStreamD m2) -- | Interleaves the outputs of two streams, yielding elements from each stream -- alternately, starting from the first stream. As soon as the first stream --- finishes the output stops discarding the second stream. +-- finishes, the output stops, discarding the remaining part of the second +-- stream. In this case, the last element in the resulting stream would be from +-- the second stream. If the second stream finishes early then the first stream +-- still continues to yield elements until it finishes. +-- +-- >>> :set -XOverloadedStrings +-- >>> interleaveSuffix "abc" ",,,," :: SerialT Identity Char +-- fromList "a,b,c," +-- >>> interleaveSuffix "abc" "," :: SerialT Identity Char +-- fromList "a,bc" +-- +-- 'interleaveSuffix' is a dual of 'interleaveInfix'. -- -- Do not use at scale in concatMapWith. -- -- @since 0.7.0 -{-# INLINE interleaveFst #-} -interleaveFst ::(IsStream t, Monad m) => t m b -> t m b -> t m b -interleaveFst m1 m2 = fromStreamD $ D.interleaveFst (toStreamD m1) (toStreamD m2) +{-# INLINE interleaveSuffix #-} +interleaveSuffix ::(IsStream t, Monad m) => t m b -> t m b -> t m b +interleaveSuffix m1 m2 = + fromStreamD $ D.interleaveSuffix (toStreamD m1) (toStreamD m2) + +-- | Interleaves the outputs of two streams, yielding elements from each stream +-- alternately, starting from the first stream and ending at the first stream. +-- If the second stream is longer than the first, elements from the second +-- stream are infixed with elements from the first stream. If the first stream +-- is longer then it continues yielding elements even after the second stream +-- has finished. +-- +-- >>> :set -XOverloadedStrings +-- >>> interleaveInfix "abc" ",,,," :: SerialT Identity Char +-- fromList "a,b,c" +-- >>> interleaveInfix "abc" "," :: SerialT Identity Char +-- fromList "a,bc" +-- +-- 'interleaveInfix' is a dual of 'interleaveSuffix'. +-- +-- Do not use at scale in concatMapWith. +-- +-- @since 0.7.0 +{-# INLINE interleaveInfix #-} +interleaveInfix ::(IsStream t, Monad m) => t m b -> t m b -> t m b +interleaveInfix m1 m2 = + fromStreamD $ D.interleaveInfix (toStreamD m1) (toStreamD m2) -- | Interleaves the outputs of two streams, yielding elements from each stream -- alternately, starting from the first stream. The output stops as soon as any --- of the two streams finishes discarding the other. +-- of the two streams finishes, discarding the remaining part of the other +-- stream. The last element of the resulting stream would be from the longer +-- stream. +-- +-- >>> :set -XOverloadedStrings +-- >>> interleaveMin "ab" ",,,," :: SerialT Identity Char +-- fromList "a,b," +-- >>> interleaveMin "abcd" ",," :: SerialT Identity Char +-- fromList "a,b,c" +-- +-- 'interleaveMin' is dual to 'interleave'. -- -- Do not use at scale in concatMapWith. -- @@ -2235,29 +2312,103 @@ concatUnfoldRoundrobin ::(IsStream t, Monad m) concatUnfoldRoundrobin u m = fromStreamD $ D.concatUnfoldRoundrobin u (toStreamD m) -{- --- | Insert a stream between segements of streams and flatten. -intercalate :: (IsStream t, MonadAsync m) - => SerialT Identity b -> (a -> t m b) -> t m a -> t m b -intercalate = undefined --} +-- XXX we can swap the order of arguments to gintercalate so that the +-- definition of concatUnfold becomes simpler? The first stream should be +-- infixed inside the second one. However, if we change the order in +-- "interleave" as well similarly, then that will make it a bit unintuitive. +-- +-- > concatUnfold unf str = +-- > gintercalate unf str (UF.nilM (\_ -> return ())) (repeat ()) +-- +-- | 'interleaveInfix' followed by unfold and concat. +-- +-- /Internal/ +{-# INLINE gintercalate #-} +gintercalate + :: (IsStream t, Monad m) + => Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c +gintercalate unf1 str1 unf2 str2 = + D.fromStreamD $ D.gintercalate + unf1 (D.toStreamD str1) + unf2 (D.toStreamD str2) --- | @intercalateSuffix genSuffix seed gen stream@ generates streams from seeds --- in the input @stream@ using the generator @gen@ and concatenates them --- after suffixing the stream generated by @genSuffix@ using @seed@. +-- XXX The order of arguments in "intercalate" is consistent with the list +-- intercalate but inconsistent with gintercalate and other stream interleaving +-- combinators. We can change the order of the arguments in other combinators +-- but then 'interleave' combinator may become a bit unintuitive because we +-- will be starting with the second stream. + +-- > intercalate seed unf str = gintercalate unf str unf (repeatM seed) +-- > intercalate a unf str = concatUnfold unf $ intersperse a str -- --- For example to insert CRLF in a stream of character strings: +-- | 'intersperse' followed by unfold and concat. -- --- > unlines = intercalateSuffix UF.fromList "\r\n" UF.fromList +-- > unwords = intercalate " " UF.fromList +-- +-- >>> intercalate " " UF.fromList ["abc", "def", "ghi"] +-- > "abc def ghi" +-- +{-# INLINE intercalate #-} +intercalate :: (IsStream t, Monad m) + => b -> Unfold m b c -> t m b -> t m c +intercalate seed unf str = D.fromStreamD $ + D.concatMapU unf $ D.intersperse seed (toStreamD str) + +-- > interpose x unf str = gintercalate unf str UF.identity (repeat x) +-- +-- | Unfold the elements of a stream, intersperse the given element between the +-- unfolded streams and then concat them into a single stream. +-- +-- > unwords = S.interpose ' ' +-- +-- /Internal/ +{-# INLINE interpose #-} +interpose :: (IsStream t, Monad m) + => c -> Unfold m b c -> t m b -> t m c +interpose x unf str = + D.fromStreamD $ D.interpose (return x) unf (D.toStreamD str) + +-- | 'interleaveSuffix' followed by unfold and concat. +-- +-- /Internal/ +{-# INLINE gintercalateSuffix #-} +gintercalateSuffix + :: (IsStream t, Monad m) + => Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c +gintercalateSuffix unf1 str1 unf2 str2 = + D.fromStreamD $ D.gintercalateSuffix + unf1 (D.toStreamD str1) + unf2 (D.toStreamD str2) + +-- > intercalateSuffix seed unf str = gintercalateSuffix unf str unf (repeatM seed) +-- > intercalateSuffix a unf str = concatUnfold unf $ intersperseSuffix a str +-- +-- | 'intersperseSuffix' followed by unfold and concat. +-- +-- > unlines = intercalateSuffix "\n" UF.fromList +-- +-- >>> intercalate "\n" UF.fromList ["abc", "def", "ghi"] +-- > "abc\ndef\nghi\n" -- {-# INLINE intercalateSuffix #-} -intercalateSuffix - :: (IsStream t, Monad m) - => Unfold m a c -> a -> Unfold m b c -> t m b -> t m c -intercalateSuffix suffix seed unf str = - D.fromStreamD $ D.interleaveFstThenConcat - unf (D.toStreamD str) - suffix (D.repeat seed) +intercalateSuffix :: (IsStream t, Monad m) + => b -> Unfold m b c -> t m b -> t m c +intercalateSuffix seed unf str = fromStreamD $ D.concatMapU unf + $ D.intersperseSuffix (return seed) (D.toStreamD str) + +-- interposeSuffix x unf str = gintercalateSuffix unf str UF.identity (repeat x) +-- +-- | Unfold the elements of a stream, append the given element after each +-- unfolded stream and then concat them into a single stream. +-- +-- > unlines = S.interposeSuffix '\n' +-- +-- /Internal/ +{-# INLINE interposeSuffix #-} +interposeSuffix :: (IsStream t, Monad m) + => c -> Unfold m b c -> t m b -> t m c +interposeSuffix x unf str = + D.fromStreamD $ D.interposeSuffix (return x) unf (D.toStreamD str) ------------------------------------------------------------------------------ -- Grouping/Splitting @@ -2407,7 +2558,7 @@ intervalsOf => Double -> Fold m a b -> t m a -> t m b intervalsOf n f xs = splitWithSuffix isNothing (FL.lcatMaybes f) - (interject n (return Nothing) (Serial.map Just xs)) + (interjectSuffix n (return Nothing) (Serial.map Just xs)) ------------------------------------------------------------------------------ -- Element Aware APIs diff --git a/src/Streamly/Streams/StreamD.hs b/src/Streamly/Streams/StreamD.hs index fc47d742e..9402fdd5e 100644 --- a/src/Streamly/Streams/StreamD.hs +++ b/src/Streamly/Streams/StreamD.hs @@ -65,6 +65,7 @@ module Streamly.Streams.StreamD -- ** Specialized Generation -- | Generate a monadic stream from a seed. , repeat + , repeatM , replicate , replicateM , fromIndices @@ -148,9 +149,13 @@ module Streamly.Streams.StreamD , InterleaveState(..) , interleave , interleaveMin - , interleaveFst + , interleaveSuffix + , interleaveInfix , roundRobin -- interleaveFair?/ParallelFair - , interleaveFstThenConcat + , gintercalateSuffix + , interposeSuffix + , gintercalate + , interpose -- ** Grouping , groupsOf @@ -229,7 +234,7 @@ module Streamly.Streams.StreamD -- * Inserting , intersperseM , intersperse - , insertAfterEach + , intersperseSuffix , insertBy -- * Deleting @@ -419,6 +424,9 @@ unfold (Unfold ustep inject) seed = Stream step Nothing -- Specialized Generation ------------------------------------------------------------------------------ +repeatM :: Monad m => m a -> Stream m a +repeatM x = Stream (\_ _ -> x >>= \r -> return $ Yield r ()) () + repeat :: Monad m => a -> Stream m a repeat x = Stream (\_ _ -> return $ Yield x ()) () @@ -2103,9 +2111,9 @@ interleaveMin (Stream step1 state1) (Stream step2 state2) = step _ (InterleaveFirstOnly _) = undefined step _ (InterleaveSecondOnly _) = undefined -{-# INLINE_NORMAL interleaveFst #-} -interleaveFst :: Monad m => Stream m a -> Stream m a -> Stream m a -interleaveFst (Stream step1 state1) (Stream step2 state2) = +{-# INLINE_NORMAL interleaveSuffix #-} +interleaveSuffix :: Monad m => Stream m a -> Stream m a -> Stream m a +interleaveSuffix (Stream step1 state1) (Stream step2 state2) = Stream step (InterleaveFirst state1 state2) where @@ -2134,6 +2142,52 @@ interleaveFst (Stream step1 state1) (Stream step2 state2) = step _ (InterleaveSecondOnly _) = undefined +data InterleaveInfixState s1 s2 a + = InterleaveInfixFirst s1 s2 + | InterleaveInfixSecondBuf s1 s2 + | InterleaveInfixSecondYield s1 s2 a + | InterleaveInfixFirstYield s1 s2 a + | InterleaveInfixFirstOnly s1 + +{-# INLINE_NORMAL interleaveInfix #-} +interleaveInfix :: Monad m => Stream m a -> Stream m a -> Stream m a +interleaveInfix (Stream step1 state1) (Stream step2 state2) = + Stream step (InterleaveInfixFirst state1 state2) + + where + + {-# INLINE_LATE step #-} + step gst (InterleaveInfixFirst st1 st2) = do + r <- step1 gst st1 + return $ case r of + Yield a s -> Yield a (InterleaveInfixSecondBuf s st2) + Skip s -> Skip (InterleaveInfixFirst s st2) + Stop -> Stop + + step gst (InterleaveInfixSecondBuf st1 st2) = do + r <- step2 gst st2 + return $ case r of + Yield a s -> Skip (InterleaveInfixSecondYield st1 s a) + Skip s -> Skip (InterleaveInfixSecondBuf st1 s) + Stop -> Skip (InterleaveInfixFirstOnly st1) + + step gst (InterleaveInfixSecondYield st1 st2 x) = do + r <- step1 gst st1 + return $ case r of + Yield a s -> Yield x (InterleaveInfixFirstYield s st2 a) + Skip s -> Skip (InterleaveInfixSecondYield s st2 x) + Stop -> Stop + + step _ (InterleaveInfixFirstYield st1 st2 x) = do + return $ Yield x (InterleaveInfixSecondBuf st1 st2) + + step gst (InterleaveInfixFirstOnly st1) = do + r <- step1 gst st1 + return $ case r of + Yield a s -> Yield a (InterleaveInfixFirstOnly s) + Skip s -> Skip (InterleaveInfixFirstOnly s) + Stop -> Stop + {-# INLINE_NORMAL roundRobin #-} roundRobin :: Monad m => Stream m a -> Stream m a -> Stream m a roundRobin (Stream step1 state1) (Stream step2 state2) = @@ -2190,11 +2244,11 @@ data ICUState s1 s2 i1 i2 = -- => [streamA1, streamB1, streamA2...StreamAn, streamBn] -- => [a11, a12, ...a1j, b11, b12, ...b1k, a21, a22, ...] -- -{-# INLINE_NORMAL interleaveFstThenConcat #-} -interleaveFstThenConcat +{-# INLINE_NORMAL gintercalateSuffix #-} +gintercalateSuffix :: Monad m => Unfold m a c -> Stream m a -> Unfold m b c -> Stream m b -> Stream m c -interleaveFstThenConcat +gintercalateSuffix (Unfold istep1 inject1) (Stream step1 state1) (Unfold istep2 inject2) (Stream step2 state2) = Stream step (ICUFirst state1 state2) @@ -2253,6 +2307,269 @@ interleaveFstThenConcat step _ (ICUSecondOnly _s2) = undefined step _ (ICUSecondOnlyInner _s2 _i2) = undefined +data InterposeSuffixState s1 i1 = + InterposeSuffixFirst s1 + -- | InterposeSuffixFirstYield s1 i1 + | InterposeSuffixFirstInner s1 i1 + | InterposeSuffixSecond s1 + +-- Note that if an unfolded layer turns out to be nil we still emit the +-- separator effect. An alternate behavior could be to emit the separator +-- effect only if at least one element has been yielded by the unfolding. +-- However, that becomes a bit complicated, so we have chosen the former +-- behvaior for now. +{-# INLINE_NORMAL interposeSuffix #-} +interposeSuffix + :: Monad m + => m c -> Unfold m b c -> Stream m b -> Stream m c +interposeSuffix + action + (Unfold istep1 inject1) (Stream step1 state1) = + Stream step (InterposeSuffixFirst state1) + + where + + {-# INLINE_LATE step #-} + step gst (InterposeSuffixFirst s1) = do + r <- step1 (adaptState gst) s1 + case r of + Yield a s -> do + i <- inject1 a + i `seq` return (Skip (InterposeSuffixFirstInner s i)) + -- i `seq` return (Skip (InterposeSuffixFirstYield s i)) + Skip s -> return $ Skip (InterposeSuffixFirst s) + Stop -> return Stop + + {- + step _ (InterposeSuffixFirstYield s1 i1) = do + r <- istep1 i1 + return $ case r of + Yield x i' -> Yield x (InterposeSuffixFirstInner s1 i') + Skip i' -> Skip (InterposeSuffixFirstYield s1 i') + Stop -> Skip (InterposeSuffixFirst s1) + -} + + step _ (InterposeSuffixFirstInner s1 i1) = do + r <- istep1 i1 + return $ case r of + Yield x i' -> Yield x (InterposeSuffixFirstInner s1 i') + Skip i' -> Skip (InterposeSuffixFirstInner s1 i') + Stop -> Skip (InterposeSuffixSecond s1) + + step _ (InterposeSuffixSecond s1) = do + r <- action + return $ Yield r (InterposeSuffixFirst s1) + +data ICALState s1 s2 i1 i2 a = + ICALFirst s1 s2 + -- | ICALFirstYield s1 s2 i1 + | ICALFirstInner s1 s2 i1 + | ICALFirstOnly s1 + | ICALFirstOnlyInner s1 i1 + | ICALSecondInject s1 s2 + | ICALFirstInject s1 s2 i2 + -- | ICALFirstBuf s1 s2 i1 i2 + | ICALSecondInner s1 s2 i1 i2 + -- -- | ICALSecondInner s1 s2 i1 i2 a + -- -- | ICALFirstResume s1 s2 i1 i2 a + +-- | Interleave streams (full streams, not the elements) unfolded from two +-- input streams and concat. Stop when the first stream stops. If the second +-- stream ends before the first one then first stream still keeps running alone +-- without any interleaving with the second stream. +-- +-- [a1, a2, ... an] [b1, b2 ...] +-- => [streamA1, streamA2, ... streamAn] [streamB1, streamB2, ...] +-- => [streamA1, streamB1, streamA2...StreamAn, streamBn] +-- => [a11, a12, ...a1j, b11, b12, ...b1k, a21, a22, ...] +-- +{-# INLINE_NORMAL gintercalate #-} +gintercalate + :: Monad m + => Unfold m a c -> Stream m a -> Unfold m b c -> Stream m b -> Stream m c +gintercalate + (Unfold istep1 inject1) (Stream step1 state1) + (Unfold istep2 inject2) (Stream step2 state2) = + Stream step (ICALFirst state1 state2) + + where + + {-# INLINE_LATE step #-} + step gst (ICALFirst s1 s2) = do + r <- step1 (adaptState gst) s1 + case r of + Yield a s -> do + i <- inject1 a + i `seq` return (Skip (ICALFirstInner s s2 i)) + -- i `seq` return (Skip (ICALFirstYield s s2 i)) + Skip s -> return $ Skip (ICALFirst s s2) + Stop -> return Stop + + {- + step _ (ICALFirstYield s1 s2 i1) = do + r <- istep1 i1 + return $ case r of + Yield x i' -> Yield x (ICALFirstInner s1 s2 i') + Skip i' -> Skip (ICALFirstYield s1 s2 i') + Stop -> Skip (ICALFirst s1 s2) + -} + + step _ (ICALFirstInner s1 s2 i1) = do + r <- istep1 i1 + return $ case r of + Yield x i' -> Yield x (ICALFirstInner s1 s2 i') + Skip i' -> Skip (ICALFirstInner s1 s2 i') + Stop -> Skip (ICALSecondInject s1 s2) + + step gst (ICALFirstOnly s1) = do + r <- step1 (adaptState gst) s1 + case r of + Yield a s -> do + i <- inject1 a + i `seq` return (Skip (ICALFirstOnlyInner s i)) + Skip s -> return $ Skip (ICALFirstOnly s) + Stop -> return Stop + + step _ (ICALFirstOnlyInner s1 i1) = do + r <- istep1 i1 + return $ case r of + Yield x i' -> Yield x (ICALFirstOnlyInner s1 i') + Skip i' -> Skip (ICALFirstOnlyInner s1 i') + Stop -> Skip (ICALFirstOnly s1) + + -- We inject the second stream even before checking if the first stream + -- would yield any more elements. There is no clear choice whether we + -- should do this before or after that. Doing it after may make the state + -- machine a bit simpler though. + step gst (ICALSecondInject s1 s2) = do + r <- step2 (adaptState gst) s2 + case r of + Yield a s -> do + i <- inject2 a + i `seq` return (Skip (ICALFirstInject s1 s i)) + Skip s -> return $ Skip (ICALSecondInject s1 s) + Stop -> return $ Skip (ICALFirstOnly s1) + + step gst (ICALFirstInject s1 s2 i2) = do + r <- step1 (adaptState gst) s1 + case r of + Yield a s -> do + i <- inject1 a + i `seq` return (Skip (ICALSecondInner s s2 i i2)) + -- i `seq` return (Skip (ICALFirstBuf s s2 i i2)) + Skip s -> return $ Skip (ICALFirstInject s s2 i2) + Stop -> return Stop + + {- + step _ (ICALFirstBuf s1 s2 i1 i2) = do + r <- istep1 i1 + return $ case r of + Yield x i' -> Skip (ICALSecondInner s1 s2 i' i2 x) + Skip i' -> Skip (ICALFirstBuf s1 s2 i' i2) + Stop -> Stop + + step _ (ICALSecondInner s1 s2 i1 i2 v) = do + r <- istep2 i2 + return $ case r of + Yield x i' -> Yield x (ICALSecondInner s1 s2 i1 i' v) + Skip i' -> Skip (ICALSecondInner s1 s2 i1 i' v) + Stop -> Skip (ICALFirstResume s1 s2 i1 i2 v) + -} + + step _ (ICALSecondInner s1 s2 i1 i2) = do + r <- istep2 i2 + return $ case r of + Yield x i' -> Yield x (ICALSecondInner s1 s2 i1 i') + Skip i' -> Skip (ICALSecondInner s1 s2 i1 i') + Stop -> Skip (ICALFirstInner s1 s2 i1) + -- Stop -> Skip (ICALFirstResume s1 s2 i1 i2) + + {- + step _ (ICALFirstResume s1 s2 i1 i2 x) = do + return $ Yield x (ICALFirstInner s1 s2 i1 i2) + -} + +data InterposeState s1 i1 a = + InterposeFirst s1 + -- | InterposeFirstYield s1 i1 + | InterposeFirstInner s1 i1 + | InterposeFirstInject s1 + -- | InterposeFirstBuf s1 i1 + | InterposeSecondYield s1 i1 + -- -- | InterposeSecondYield s1 i1 a + -- -- | InterposeFirstResume s1 i1 a + +-- Note that this only interposes the pure values, we may run many effects to +-- generate those values as some effects may not generate anything (Skip). +{-# INLINE_NORMAL interpose #-} +interpose :: Monad m => m c -> Unfold m b c -> Stream m b -> Stream m c +interpose + action + (Unfold istep1 inject1) (Stream step1 state1) = + Stream step (InterposeFirst state1) + + where + + {-# INLINE_LATE step #-} + step gst (InterposeFirst s1) = do + r <- step1 (adaptState gst) s1 + case r of + Yield a s -> do + i <- inject1 a + i `seq` return (Skip (InterposeFirstInner s i)) + -- i `seq` return (Skip (InterposeFirstYield s i)) + Skip s -> return $ Skip (InterposeFirst s) + Stop -> return Stop + + {- + step _ (InterposeFirstYield s1 i1) = do + r <- istep1 i1 + return $ case r of + Yield x i' -> Yield x (InterposeFirstInner s1 i') + Skip i' -> Skip (InterposeFirstYield s1 i') + Stop -> Skip (InterposeFirst s1) + -} + + step _ (InterposeFirstInner s1 i1) = do + r <- istep1 i1 + return $ case r of + Yield x i' -> Yield x (InterposeFirstInner s1 i') + Skip i' -> Skip (InterposeFirstInner s1 i') + Stop -> Skip (InterposeFirstInject s1) + + step gst (InterposeFirstInject s1) = do + r <- step1 (adaptState gst) s1 + case r of + Yield a s -> do + i <- inject1 a + -- i `seq` return (Skip (InterposeFirstBuf s i)) + i `seq` return (Skip (InterposeSecondYield s i)) + Skip s -> return $ Skip (InterposeFirstInject s) + Stop -> return Stop + + {- + step _ (InterposeFirstBuf s1 i1) = do + r <- istep1 i1 + return $ case r of + Yield x i' -> Skip (InterposeSecondYield s1 i' x) + Skip i' -> Skip (InterposeFirstBuf s1 i') + Stop -> Stop + -} + + {- + step _ (InterposeSecondYield s1 i1 v) = do + r <- action + return $ Yield r (InterposeFirstResume s1 i1 v) + -} + step _ (InterposeSecondYield s1 i1) = do + r <- action + return $ Yield r (InterposeFirstInner s1 i1) + + {- + step _ (InterposeFirstResume s1 i1 v) = do + return $ Yield v (InterposeFirstInner s1 i1) + -} + ------------------------------------------------------------------------------ -- Exceptions ------------------------------------------------------------------------------ @@ -2853,9 +3170,9 @@ data SuffixState s a | SuffixSuffix s | SuffixYield a (SuffixState s a) -{-# INLINE_NORMAL insertAfterEach #-} -insertAfterEach :: forall m a. Monad m => m a -> Stream m a -> Stream m a -insertAfterEach action (Stream step state) = Stream step' (SuffixElem state) +{-# INLINE_NORMAL intersperseSuffix #-} +intersperseSuffix :: forall m a. Monad m => m a -> Stream m a -> Stream m a +intersperseSuffix action (Stream step state) = Stream step' (SuffixElem state) where {-# INLINE_LATE step' #-} step' gst (SuffixElem st) = do diff --git a/src/Streamly/Streams/StreamK.hs b/src/Streamly/Streams/StreamK.hs index 95ccfbbfc..2e8ca3b69 100644 --- a/src/Streamly/Streams/StreamK.hs +++ b/src/Streamly/Streams/StreamK.hs @@ -62,6 +62,7 @@ module Streamly.Streams.StreamK -- ** Specialized Generation , repeat + , repeatM , replicate , replicateM , fromIndices @@ -268,9 +269,16 @@ once = yieldM -- repeatM = cycle1 . yield -- @ -- +-- Generate an infinite stream by repeating a monadic value. +-- +-- /Internal/ +repeatM :: (IsStream t, MonadAsync m) => m a -> t m a +repeatM = go + where go m = m |: go m + -- Generate an infinite stream by repeating a pure value. -- --- @since 0.4.0 +-- /Internal/ {-# INLINE repeat #-} repeat :: IsStream t => a -> t m a repeat a = let x = cons a x in x