diff --git a/src/Streamly/Internal/Data/Stream/IsStream/Expand.hs b/src/Streamly/Internal/Data/Stream/IsStream/Expand.hs index 9e566f748..3da2a621a 100644 --- a/src/Streamly/Internal/Data/Stream/IsStream/Expand.hs +++ b/src/Streamly/Internal/Data/Stream/IsStream/Expand.hs @@ -52,8 +52,8 @@ module Streamly.Internal.Data.Stream.IsStream.Expand -- streams that come earlier in the composition. -- , wSerial - , WSerial.wSerialFst - , WSerial.wSerialMin + , Serial.wSerialFst + , Serial.wSerialMin -- ** Interleave -- | 'interleave' is like 'wSerial' but using a direct style @@ -175,7 +175,7 @@ import Streamly.Data.Unfold (Unfold) import qualified Streamly.Internal.Data.Stream.IsStream.Type as IsStream import qualified Streamly.Internal.Data.Stream.Parallel as Par -import qualified Streamly.Internal.Data.Stream.WSerial as WSerial +import qualified Streamly.Internal.Data.Stream.Serial as Serial import qualified Streamly.Internal.Data.Stream.StreamD as D (append, interleave, interleaveSuffix, interleaveInfix, interleaveMin , roundRobin, mergeByM, unfoldMany, unfoldManyInterleave, intersperse diff --git a/src/Streamly/Internal/Data/Stream/IsStream/Generate.hs b/src/Streamly/Internal/Data/Stream/IsStream/Generate.hs index 56da1c01c..15be0ffd7 100644 --- a/src/Streamly/Internal/Data/Stream/IsStream/Generate.hs +++ b/src/Streamly/Internal/Data/Stream/IsStream/Generate.hs @@ -100,8 +100,7 @@ import Streamly.Internal.Data.Stream.IsStream.Common , yield, yieldM, repeatM) import Streamly.Internal.Data.Stream.IsStream.Type (IsStream (..), fromSerial, consM, fromStreamD) -import Streamly.Internal.Data.Stream.Serial (SerialT) -import Streamly.Internal.Data.Stream.WSerial (WSerialT) +import Streamly.Internal.Data.Stream.Serial (SerialT, WSerialT) import Streamly.Internal.Data.Stream.Zip (ZipSerialM) import Streamly.Internal.Data.Time.Units (AbsTime , RelTime64, addToAbsTime64) import Streamly.Internal.Data.Unboxed (Unboxed) diff --git a/src/Streamly/Internal/Data/Stream/IsStream/Type.hs b/src/Streamly/Internal/Data/Stream/IsStream/Type.hs index 04f81f963..075d5803b 100644 --- a/src/Streamly/Internal/Data/Stream/IsStream/Type.hs +++ b/src/Streamly/Internal/Data/Stream/IsStream/Type.hs @@ -101,8 +101,8 @@ where import Streamly.Internal.Control.Concurrent (MonadAsync) import Streamly.Internal.Data.Fold.Type (Fold (..)) -import Streamly.Internal.Data.Stream.Serial (SerialT, Serial) -import Streamly.Internal.Data.Stream.WSerial (WSerialT(..), WSerial) +import Streamly.Internal.Data.Stream.Serial + (SerialT, Serial, WSerialT(..), WSerial) import Streamly.Internal.Data.Stream.Async (AsyncT(..), Async, WAsyncT(..), WAsync) import Streamly.Internal.Data.Stream.Ahead (AheadT(..), Ahead) @@ -116,7 +116,6 @@ import qualified Streamly.Internal.Data.Stream.Ahead as Ahead import qualified Streamly.Internal.Data.Stream.Async as Async import qualified Streamly.Internal.Data.Stream.Parallel as Parallel import qualified Streamly.Internal.Data.Stream.Serial as Serial -import qualified Streamly.Internal.Data.Stream.WSerial as WSerial import qualified Streamly.Internal.Data.Stream.StreamD.Type as D (Stream(..), toStreamK, fromStreamK , drain, eqBy, cmpBy, fromList, toList, foldrMx, foldlMx' @@ -470,12 +469,12 @@ instance IsStream WSerialT where {-# INLINE consM #-} {-# SPECIALIZE consM :: IO a -> WSerialT IO a -> WSerialT IO a #-} consM :: Monad m => m a -> WSerialT m a -> WSerialT m a - consM = WSerial.consMWSerial + consM = Serial.consMWSerial {-# INLINE (|:) #-} {-# SPECIALIZE (|:) :: IO a -> WSerialT IO a -> WSerialT IO a #-} (|:) :: Monad m => m a -> WSerialT m a -> WSerialT m a - (|:) = WSerial.consMWSerial + (|:) = Serial.consMWSerial ------------------------------------------------------------------------------- -- Async diff --git a/src/Streamly/Internal/Data/Stream/Serial.hs b/src/Streamly/Internal/Data/Stream/Serial.hs index 3b31fbe01..2ccdcd2ad 100644 --- a/src/Streamly/Internal/Data/Stream/Serial.hs +++ b/src/Streamly/Internal/Data/Stream/Serial.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE UndecidableInstances #-} + -- | -- Module : Streamly.Internal.Data.Stream.Serial -- Copyright : (c) 2017 Composewell Technologies @@ -10,6 +12,7 @@ -- To run examples in this module: -- -- >>> import qualified Streamly.Data.Stream as Stream +-- >>> import qualified Streamly.Prelude as IsStream -- module Streamly.Internal.Data.Stream.Serial ( @@ -18,6 +21,14 @@ module Streamly.Internal.Data.Stream.Serial , Serial , serial + -- * Serial interleaving stream + , WSerialT(..) + , WSerial + , wSerial + , wSerialFst + , wSerialMin + , consMWSerial + -- * Construction , Stream.cons , Stream.consM @@ -34,15 +45,42 @@ module Streamly.Internal.Data.Stream.Serial ) where -import GHC.Exts (IsList(..)) +import Control.Applicative (liftA2) +import Control.DeepSeq (NFData(..), NFData1(..)) +import Control.Monad.Base (MonadBase(..), liftBaseDefault) +import Control.Monad.Catch (MonadThrow, throwM) +import Control.Monad.IO.Class (MonadIO(..)) +import Control.Monad.Reader.Class (MonadReader(..)) +import Control.Monad.State.Class (MonadState(..)) +import Control.Monad.Trans.Class (MonadTrans(lift)) +import Data.Foldable (Foldable(foldl'), fold) +import Data.Functor.Identity (Identity(..), runIdentity) +import Data.Maybe (fromMaybe) +import Data.Semigroup (Endo(..)) +import GHC.Exts (IsList(..), IsString(..), oneShot) +import Text.Read + ( Lexeme(Ident), lexP, parens, prec, readPrec, readListPrec + , readListPrecDefault) +import Streamly.Internal.BaseCompat ((#.)) +import Streamly.Internal.Data.Maybe.Strict (Maybe'(..), toMaybe) import Streamly.Data.Stream (Stream) import qualified Streamly.Data.Stream as Stream +import qualified Streamly.Internal.Data.Stream.Common as P +import qualified Streamly.Internal.Data.Stream.StreamD as D + (fromStreamK, toStreamK, mapM) +import qualified Streamly.Internal.Data.Stream.StreamK.Type as K + (Stream, cons, consM, nil, concatMapWith, fromPure, bindWith + , withLocal, interleave, interleaveFst, interleaveMin) -import Prelude hiding (map) +import Prelude hiding (map, mapM, repeat, filter) + +#include "Instances.hs" +#include "inline.hs" -- $setup -- >>> import qualified Streamly.Data.Stream as Stream +-- >>> import qualified Streamly.Prelude as IsStream ------------------------------------------------------------------------------ -- SerialT @@ -82,3 +120,146 @@ serial = (<>) {-# INLINE map #-} map :: Monad m => (a -> b) -> SerialT m a -> SerialT m b map f = Stream.mapM (return . f) + +------------------------------------------------------------------------------ +-- WSerialT +------------------------------------------------------------------------------ + +-- | For 'WSerialT' streams: +-- +-- @ +-- (<>) = 'Streamly.Prelude.wSerial' -- 'Semigroup' +-- (>>=) = flip . 'Streamly.Prelude.concatMapWith' 'Streamly.Prelude.wSerial' -- 'Monad' +-- @ +-- +-- Note that '<>' is associative only if we disregard the ordering of elements +-- in the resulting stream. +-- +-- A single 'Monad' bind behaves like a @for@ loop: +-- +-- >>> :{ +-- IsStream.toList $ IsStream.fromWSerial $ do +-- x <- IsStream.fromList [1,2] -- foreach x in stream +-- return x +-- :} +-- [1,2] +-- +-- Nested monad binds behave like interleaved nested @for@ loops: +-- +-- >>> :{ +-- IsStream.toList $ IsStream.fromWSerial $ do +-- x <- IsStream.fromList [1,2] -- foreach x in stream +-- y <- IsStream.fromList [3,4] -- foreach y in stream +-- return (x, y) +-- :} +-- [(1,3),(2,3),(1,4),(2,4)] +-- +-- It is a result of interleaving all the nested iterations corresponding to +-- element @1@ in the first stream with all the nested iterations of element +-- @2@: +-- +-- >>> import Streamly.Prelude (wSerial) +-- >>> IsStream.toList $ IsStream.fromList [(1,3),(1,4)] `IsStream.wSerial` IsStream.fromList [(2,3),(2,4)] +-- [(1,3),(2,3),(1,4),(2,4)] +-- +-- The @W@ in the name stands for @wide@ or breadth wise scheduling in +-- contrast to the depth wise scheduling behavior of 'Stream'. +-- +-- /Since: 0.2.0 ("Streamly")/ +-- +-- @since 0.8.0 +newtype WSerialT m a = WSerialT {getWSerialT :: K.Stream m a} + deriving (MonadTrans) + +-- | An interleaving serial IO stream of elements of type @a@. See 'WSerialT' +-- documentation for more details. +-- +-- /Since: 0.2.0 ("Streamly")/ +-- +-- @since 0.8.0 +type WSerial = WSerialT IO + +{-# INLINE consMWSerial #-} +{-# SPECIALIZE consMWSerial :: IO a -> WSerialT IO a -> WSerialT IO a #-} +consMWSerial :: Monad m => m a -> WSerialT m a -> WSerialT m a +consMWSerial m (WSerialT ms) = WSerialT $ K.consM m ms + +------------------------------------------------------------------------------ +-- Semigroup +------------------------------------------------------------------------------ + +infixr 6 `wSerial` + +-- | Interleaves two streams, yielding one element from each stream +-- alternately. When one stream stops the rest of the other stream is used in +-- the output stream. +-- +-- This gives exponential priority to earlier streams than the ones joining +-- later. Because of exponential weighting it can be used with 'concatMapWith'. +-- +-- /Not fused/ + +-- NOTE: +-- +-- Note that evaluation of @a \`wSerial` b \`wSerial` c@ does not interleave +-- @a@, @b@ and @c@ with equal priority. This expression is equivalent to @a +-- \`wSerial` (b \`wSerial` c)@, therefore, it fairly interleaves @a@ with the +-- result of @b \`wSerial` c@. For example, @Stream.fromList [1,2] \`wSerial` +-- Stream.fromList [3,4] \`wSerial` Stream.fromList [5,6]@ would result in +-- [1,3,2,5,4,6]. In other words, the leftmost stream gets the same scheduling +-- priority as the rest of the streams taken together. The same is true for +-- each subexpression on the right. +-- +{-# INLINE wSerial #-} +wSerial :: WSerialT m a -> WSerialT m a -> WSerialT m a +wSerial (WSerialT m1) (WSerialT m2) = WSerialT $ K.interleave m1 m2 + +{-# INLINE wSerialFst #-} +wSerialFst :: WSerialT m a -> WSerialT m a -> WSerialT m a +wSerialFst (WSerialT m1) (WSerialT m2) = WSerialT $ K.interleaveFst m1 m2 + +{-# INLINE wSerialMin #-} +wSerialMin :: WSerialT m a -> WSerialT m a -> WSerialT m a +wSerialMin (WSerialT m1) (WSerialT m2) = WSerialT $ K.interleaveMin m1 m2 + +instance Semigroup (WSerialT m a) where + (<>) = wSerial + +------------------------------------------------------------------------------ +-- Monoid +------------------------------------------------------------------------------ + +instance Monoid (WSerialT m a) where + mempty = WSerialT K.nil + mappend = (<>) + +{-# INLINE apWSerial #-} +apWSerial :: Monad m => WSerialT m (a -> b) -> WSerialT m a -> WSerialT m b +apWSerial (WSerialT m1) (WSerialT m2) = + let f x1 = K.concatMapWith K.interleave (pure . x1) m2 + in WSerialT $ K.concatMapWith K.interleave f m1 + +instance Monad m => Applicative (WSerialT m) where + {-# INLINE pure #-} + pure = WSerialT . K.fromPure + {-# INLINE (<*>) #-} + (<*>) = apWSerial + +------------------------------------------------------------------------------ +-- Monad +------------------------------------------------------------------------------ + +instance Monad m => Monad (WSerialT m) where + return = pure + {-# INLINE (>>=) #-} + (>>=) (WSerialT m) f = WSerialT $ K.bindWith K.interleave m (getWSerialT . f) + +------------------------------------------------------------------------------ +-- Other instances +------------------------------------------------------------------------------ + +MONAD_COMMON_INSTANCES(WSerialT,) +LIST_INSTANCES(WSerialT) +NFDATA1_INSTANCE(WSerialT) +FOLDABLE_INSTANCE(WSerialT) +TRAVERSABLE_INSTANCE(WSerialT) diff --git a/src/Streamly/Internal/Data/Stream/WSerial.hs b/src/Streamly/Internal/Data/Stream/WSerial.hs deleted file mode 100644 index b0ccbfc3d..000000000 --- a/src/Streamly/Internal/Data/Stream/WSerial.hs +++ /dev/null @@ -1,205 +0,0 @@ -{-# LANGUAGE UndecidableInstances #-} - --- | --- Module : Streamly.Internal.Data.Stream.WSerial --- Copyright : (c) 2017 Composewell Technologies --- --- License : BSD3 --- Maintainer : streamly@composewell.com --- Stability : experimental --- Portability : GHC --- --- To run examples in this module: --- --- >>> import qualified Streamly.Prelude as Stream --- -module Streamly.Internal.Data.Stream.WSerial - ( - -- * Serial interleaving stream - WSerialT(..) - , WSerial - , wSerial - , wSerialFst - , wSerialMin - , consMWSerial - ) -where - -import Control.Applicative (liftA2) -import Control.DeepSeq (NFData(..)) -import Control.DeepSeq (NFData1(..)) -import Control.Monad.Base (MonadBase(..), liftBaseDefault) -import Control.Monad.Catch (MonadThrow, throwM) -import Control.Monad.IO.Class (MonadIO(..)) -import Control.Monad.Reader.Class (MonadReader(..)) -import Control.Monad.State.Class (MonadState(..)) -import Control.Monad.Trans.Class (MonadTrans(lift)) -import Data.Foldable (Foldable(foldl'), fold) -import Data.Functor.Identity (Identity(..), runIdentity) -import Data.Maybe (fromMaybe) -import Data.Semigroup (Endo(..)) -import GHC.Exts (IsList(..), IsString(..), oneShot) -import Text.Read - ( Lexeme(Ident), lexP, parens, prec, readPrec, readListPrec - , readListPrecDefault) -import Streamly.Internal.BaseCompat ((#.)) -import Streamly.Internal.Data.Maybe.Strict (Maybe'(..), toMaybe) -import Streamly.Internal.Data.Stream.StreamK.Type (Stream) - -import qualified Streamly.Internal.Data.Stream.Common as P -import qualified Streamly.Internal.Data.Stream.StreamD as D - (fromStreamK, toStreamK, mapM) -import qualified Streamly.Internal.Data.Stream.StreamK.Type as K - (cons, consM, nil, concatMapWith, fromPure, bindWith - , withLocal, interleave, interleaveFst, interleaveMin) - -import Prelude hiding (map, mapM, repeat, filter) - -#include "Instances.hs" -#include "inline.hs" - --- $setup --- >>> import qualified Streamly.Prelude as Stream - ------------------------------------------------------------------------------- --- WSerialT ------------------------------------------------------------------------------- - --- | For 'WSerialT' streams: --- --- @ --- (<>) = 'Streamly.Prelude.wSerial' -- 'Semigroup' --- (>>=) = flip . 'Streamly.Prelude.concatMapWith' 'Streamly.Prelude.wSerial' -- 'Monad' --- @ --- --- Note that '<>' is associative only if we disregard the ordering of elements --- in the resulting stream. --- --- A single 'Monad' bind behaves like a @for@ loop: --- --- >>> :{ --- Stream.toList $ Stream.fromWSerial $ do --- x <- Stream.fromList [1,2] -- foreach x in stream --- return x --- :} --- [1,2] --- --- Nested monad binds behave like interleaved nested @for@ loops: --- --- >>> :{ --- Stream.toList $ Stream.fromWSerial $ do --- x <- Stream.fromList [1,2] -- foreach x in stream --- y <- Stream.fromList [3,4] -- foreach y in stream --- return (x, y) --- :} --- [(1,3),(2,3),(1,4),(2,4)] --- --- It is a result of interleaving all the nested iterations corresponding to --- element @1@ in the first stream with all the nested iterations of element --- @2@: --- --- >>> import Streamly.Prelude (wSerial) --- >>> Stream.toList $ Stream.fromList [(1,3),(1,4)] `Stream.wSerial` Stream.fromList [(2,3),(2,4)] --- [(1,3),(2,3),(1,4),(2,4)] --- --- The @W@ in the name stands for @wide@ or breadth wise scheduling in --- contrast to the depth wise scheduling behavior of 'Stream'. --- --- /Since: 0.2.0 ("Streamly")/ --- --- @since 0.8.0 -newtype WSerialT m a = WSerialT {getWSerialT :: Stream m a} - deriving (MonadTrans) - --- | An interleaving serial IO stream of elements of type @a@. See 'WSerialT' --- documentation for more details. --- --- /Since: 0.2.0 ("Streamly")/ --- --- @since 0.8.0 -type WSerial = WSerialT IO - -{-# INLINE consMWSerial #-} -{-# SPECIALIZE consMWSerial :: IO a -> WSerialT IO a -> WSerialT IO a #-} -consMWSerial :: Monad m => m a -> WSerialT m a -> WSerialT m a -consMWSerial m (WSerialT ms) = WSerialT $ K.consM m ms - ------------------------------------------------------------------------------- --- Semigroup ------------------------------------------------------------------------------- - -infixr 6 `wSerial` - --- | Interleaves two streams, yielding one element from each stream --- alternately. When one stream stops the rest of the other stream is used in --- the output stream. --- --- This gives exponential priority to earlier streams than the ones joining --- later. Because of exponential weighting it can be used with 'concatMapWith'. --- --- /Not fused/ - --- NOTE: --- --- Note that evaluation of @a \`wSerial` b \`wSerial` c@ does not interleave --- @a@, @b@ and @c@ with equal priority. This expression is equivalent to @a --- \`wSerial` (b \`wSerial` c)@, therefore, it fairly interleaves @a@ with the --- result of @b \`wSerial` c@. For example, @Stream.fromList [1,2] \`wSerial` --- Stream.fromList [3,4] \`wSerial` Stream.fromList [5,6]@ would result in --- [1,3,2,5,4,6]. In other words, the leftmost stream gets the same scheduling --- priority as the rest of the streams taken together. The same is true for --- each subexpression on the right. --- -{-# INLINE wSerial #-} -wSerial :: WSerialT m a -> WSerialT m a -> WSerialT m a -wSerial (WSerialT m1) (WSerialT m2) = WSerialT $ K.interleave m1 m2 - -{-# INLINE wSerialFst #-} -wSerialFst :: WSerialT m a -> WSerialT m a -> WSerialT m a -wSerialFst (WSerialT m1) (WSerialT m2) = WSerialT $ K.interleaveFst m1 m2 - -{-# INLINE wSerialMin #-} -wSerialMin :: WSerialT m a -> WSerialT m a -> WSerialT m a -wSerialMin (WSerialT m1) (WSerialT m2) = WSerialT $ K.interleaveMin m1 m2 - -instance Semigroup (WSerialT m a) where - (<>) = wSerial - ------------------------------------------------------------------------------- --- Monoid ------------------------------------------------------------------------------- - -instance Monoid (WSerialT m a) where - mempty = WSerialT K.nil - mappend = (<>) - -{-# INLINE apWSerial #-} -apWSerial :: Monad m => WSerialT m (a -> b) -> WSerialT m a -> WSerialT m b -apWSerial (WSerialT m1) (WSerialT m2) = - let f x1 = K.concatMapWith K.interleave (pure . x1) m2 - in WSerialT $ K.concatMapWith K.interleave f m1 - -instance Monad m => Applicative (WSerialT m) where - {-# INLINE pure #-} - pure = WSerialT . K.fromPure - {-# INLINE (<*>) #-} - (<*>) = apWSerial - ------------------------------------------------------------------------------- --- Monad ------------------------------------------------------------------------------- - -instance Monad m => Monad (WSerialT m) where - return = pure - {-# INLINE (>>=) #-} - (>>=) (WSerialT m) f = WSerialT $ K.bindWith K.interleave m (getWSerialT . f) - ------------------------------------------------------------------------------- --- Other instances ------------------------------------------------------------------------------- - -MONAD_COMMON_INSTANCES(WSerialT,) -LIST_INSTANCES(WSerialT) -NFDATA1_INSTANCE(WSerialT) -FOLDABLE_INSTANCE(WSerialT) -TRAVERSABLE_INSTANCE(WSerialT) diff --git a/streamly.cabal b/streamly.cabal index 26ce5ca89..c5411be9b 100644 --- a/streamly.cabal +++ b/streamly.cabal @@ -344,7 +344,6 @@ library , Streamly.Internal.Data.Stream.SVar.Eliminate , Streamly.Internal.Data.Stream.Serial - , Streamly.Internal.Data.Stream.WSerial , Streamly.Internal.Data.Stream.Async , Streamly.Internal.Data.Stream.Parallel , Streamly.Internal.Data.Stream.Channel.Types diff --git a/test/Streamly/Test/Prelude/WSerial.hs b/test/Streamly/Test/Prelude/WSerial.hs index b5dd07843..184d50537 100644 --- a/test/Streamly/Test/Prelude/WSerial.hs +++ b/test/Streamly/Test/Prelude/WSerial.hs @@ -15,7 +15,7 @@ import Test.Hspec.QuickCheck import Test.QuickCheck.Monadic (monadicIO, run) import Test.Hspec as H -import qualified Streamly.Internal.Data.Stream.WSerial as WSerial +import qualified Streamly.Internal.Data.Stream.Serial as Serial import Streamly.Prelude hiding (repeat) import qualified Streamly.Prelude as S @@ -64,7 +64,7 @@ wSerialMinLengthProp = finiteStream len = S.take len $ S.repeat (1 :: Int) infiniteStream = S.repeat 1 - combined len = infiniteStream `WSerial.wSerialMin` finiteStream len + combined len = infiniteStream `Serial.wSerialMin` finiteStream len moduleName :: String moduleName = "Prelude.WSerial"