Revert "Split Serial module into Serial and WSerial"

This reverts commit 5ddfb45764.
This commit is contained in:
Harendra Kumar 2022-10-13 01:27:53 +05:30
parent 98da856b53
commit d3ec4917d9
7 changed files with 193 additions and 220 deletions

View File

@ -52,8 +52,8 @@ module Streamly.Internal.Data.Stream.IsStream.Expand
-- streams that come earlier in the composition.
--
, wSerial
, WSerial.wSerialFst
, WSerial.wSerialMin
, Serial.wSerialFst
, Serial.wSerialMin
-- ** Interleave
-- | 'interleave' is like 'wSerial' but using a direct style
@ -175,7 +175,7 @@ import Streamly.Data.Unfold (Unfold)
import qualified Streamly.Internal.Data.Stream.IsStream.Type as IsStream
import qualified Streamly.Internal.Data.Stream.Parallel as Par
import qualified Streamly.Internal.Data.Stream.WSerial as WSerial
import qualified Streamly.Internal.Data.Stream.Serial as Serial
import qualified Streamly.Internal.Data.Stream.StreamD as D
(append, interleave, interleaveSuffix, interleaveInfix, interleaveMin
, roundRobin, mergeByM, unfoldMany, unfoldManyInterleave, intersperse

View File

@ -100,8 +100,7 @@ import Streamly.Internal.Data.Stream.IsStream.Common
, yield, yieldM, repeatM)
import Streamly.Internal.Data.Stream.IsStream.Type
(IsStream (..), fromSerial, consM, fromStreamD)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Stream.WSerial (WSerialT)
import Streamly.Internal.Data.Stream.Serial (SerialT, WSerialT)
import Streamly.Internal.Data.Stream.Zip (ZipSerialM)
import Streamly.Internal.Data.Time.Units (AbsTime , RelTime64, addToAbsTime64)
import Streamly.Internal.Data.Unboxed (Unboxed)

View File

@ -101,8 +101,8 @@ where
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Fold.Type (Fold (..))
import Streamly.Internal.Data.Stream.Serial (SerialT, Serial)
import Streamly.Internal.Data.Stream.WSerial (WSerialT(..), WSerial)
import Streamly.Internal.Data.Stream.Serial
(SerialT, Serial, WSerialT(..), WSerial)
import Streamly.Internal.Data.Stream.Async
(AsyncT(..), Async, WAsyncT(..), WAsync)
import Streamly.Internal.Data.Stream.Ahead (AheadT(..), Ahead)
@ -116,7 +116,6 @@ import qualified Streamly.Internal.Data.Stream.Ahead as Ahead
import qualified Streamly.Internal.Data.Stream.Async as Async
import qualified Streamly.Internal.Data.Stream.Parallel as Parallel
import qualified Streamly.Internal.Data.Stream.Serial as Serial
import qualified Streamly.Internal.Data.Stream.WSerial as WSerial
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
(Stream(..), toStreamK, fromStreamK
, drain, eqBy, cmpBy, fromList, toList, foldrMx, foldlMx'
@ -470,12 +469,12 @@ instance IsStream WSerialT where
{-# INLINE consM #-}
{-# SPECIALIZE consM :: IO a -> WSerialT IO a -> WSerialT IO a #-}
consM :: Monad m => m a -> WSerialT m a -> WSerialT m a
consM = WSerial.consMWSerial
consM = Serial.consMWSerial
{-# INLINE (|:) #-}
{-# SPECIALIZE (|:) :: IO a -> WSerialT IO a -> WSerialT IO a #-}
(|:) :: Monad m => m a -> WSerialT m a -> WSerialT m a
(|:) = WSerial.consMWSerial
(|:) = Serial.consMWSerial
-------------------------------------------------------------------------------
-- Async

View File

@ -1,3 +1,5 @@
{-# LANGUAGE UndecidableInstances #-}
-- |
-- Module : Streamly.Internal.Data.Stream.Serial
-- Copyright : (c) 2017 Composewell Technologies
@ -10,6 +12,7 @@
-- To run examples in this module:
--
-- >>> import qualified Streamly.Data.Stream as Stream
-- >>> import qualified Streamly.Prelude as IsStream
--
module Streamly.Internal.Data.Stream.Serial
(
@ -18,6 +21,14 @@ module Streamly.Internal.Data.Stream.Serial
, Serial
, serial
-- * Serial interleaving stream
, WSerialT(..)
, WSerial
, wSerial
, wSerialFst
, wSerialMin
, consMWSerial
-- * Construction
, Stream.cons
, Stream.consM
@ -34,15 +45,42 @@ module Streamly.Internal.Data.Stream.Serial
)
where
import GHC.Exts (IsList(..))
import Control.Applicative (liftA2)
import Control.DeepSeq (NFData(..), NFData1(..))
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
import Control.Monad.Catch (MonadThrow, throwM)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.Foldable (Foldable(foldl'), fold)
import Data.Functor.Identity (Identity(..), runIdentity)
import Data.Maybe (fromMaybe)
import Data.Semigroup (Endo(..))
import GHC.Exts (IsList(..), IsString(..), oneShot)
import Text.Read
( Lexeme(Ident), lexP, parens, prec, readPrec, readListPrec
, readListPrecDefault)
import Streamly.Internal.BaseCompat ((#.))
import Streamly.Internal.Data.Maybe.Strict (Maybe'(..), toMaybe)
import Streamly.Data.Stream (Stream)
import qualified Streamly.Data.Stream as Stream
import qualified Streamly.Internal.Data.Stream.Common as P
import qualified Streamly.Internal.Data.Stream.StreamD as D
(fromStreamK, toStreamK, mapM)
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
(Stream, cons, consM, nil, concatMapWith, fromPure, bindWith
, withLocal, interleave, interleaveFst, interleaveMin)
import Prelude hiding (map)
import Prelude hiding (map, mapM, repeat, filter)
#include "Instances.hs"
#include "inline.hs"
-- $setup
-- >>> import qualified Streamly.Data.Stream as Stream
-- >>> import qualified Streamly.Prelude as IsStream
------------------------------------------------------------------------------
-- SerialT
@ -82,3 +120,146 @@ serial = (<>)
{-# INLINE map #-}
map :: Monad m => (a -> b) -> SerialT m a -> SerialT m b
map f = Stream.mapM (return . f)
------------------------------------------------------------------------------
-- WSerialT
------------------------------------------------------------------------------
-- | For 'WSerialT' streams:
--
-- @
-- (<>) = 'Streamly.Prelude.wSerial' -- 'Semigroup'
-- (>>=) = flip . 'Streamly.Prelude.concatMapWith' 'Streamly.Prelude.wSerial' -- 'Monad'
-- @
--
-- Note that '<>' is associative only if we disregard the ordering of elements
-- in the resulting stream.
--
-- A single 'Monad' bind behaves like a @for@ loop:
--
-- >>> :{
-- IsStream.toList $ IsStream.fromWSerial $ do
-- x <- IsStream.fromList [1,2] -- foreach x in stream
-- return x
-- :}
-- [1,2]
--
-- Nested monad binds behave like interleaved nested @for@ loops:
--
-- >>> :{
-- IsStream.toList $ IsStream.fromWSerial $ do
-- x <- IsStream.fromList [1,2] -- foreach x in stream
-- y <- IsStream.fromList [3,4] -- foreach y in stream
-- return (x, y)
-- :}
-- [(1,3),(2,3),(1,4),(2,4)]
--
-- It is a result of interleaving all the nested iterations corresponding to
-- element @1@ in the first stream with all the nested iterations of element
-- @2@:
--
-- >>> import Streamly.Prelude (wSerial)
-- >>> IsStream.toList $ IsStream.fromList [(1,3),(1,4)] `IsStream.wSerial` IsStream.fromList [(2,3),(2,4)]
-- [(1,3),(2,3),(1,4),(2,4)]
--
-- The @W@ in the name stands for @wide@ or breadth wise scheduling in
-- contrast to the depth wise scheduling behavior of 'Stream'.
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
newtype WSerialT m a = WSerialT {getWSerialT :: K.Stream m a}
deriving (MonadTrans)
-- | An interleaving serial IO stream of elements of type @a@. See 'WSerialT'
-- documentation for more details.
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
type WSerial = WSerialT IO
{-# INLINE consMWSerial #-}
{-# SPECIALIZE consMWSerial :: IO a -> WSerialT IO a -> WSerialT IO a #-}
consMWSerial :: Monad m => m a -> WSerialT m a -> WSerialT m a
consMWSerial m (WSerialT ms) = WSerialT $ K.consM m ms
------------------------------------------------------------------------------
-- Semigroup
------------------------------------------------------------------------------
infixr 6 `wSerial`
-- | Interleaves two streams, yielding one element from each stream
-- alternately. When one stream stops the rest of the other stream is used in
-- the output stream.
--
-- This gives exponential priority to earlier streams than the ones joining
-- later. Because of exponential weighting it can be used with 'concatMapWith'.
--
-- /Not fused/
-- NOTE:
--
-- Note that evaluation of @a \`wSerial` b \`wSerial` c@ does not interleave
-- @a@, @b@ and @c@ with equal priority. This expression is equivalent to @a
-- \`wSerial` (b \`wSerial` c)@, therefore, it fairly interleaves @a@ with the
-- result of @b \`wSerial` c@. For example, @Stream.fromList [1,2] \`wSerial`
-- Stream.fromList [3,4] \`wSerial` Stream.fromList [5,6]@ would result in
-- [1,3,2,5,4,6]. In other words, the leftmost stream gets the same scheduling
-- priority as the rest of the streams taken together. The same is true for
-- each subexpression on the right.
--
{-# INLINE wSerial #-}
wSerial :: WSerialT m a -> WSerialT m a -> WSerialT m a
wSerial (WSerialT m1) (WSerialT m2) = WSerialT $ K.interleave m1 m2
{-# INLINE wSerialFst #-}
wSerialFst :: WSerialT m a -> WSerialT m a -> WSerialT m a
wSerialFst (WSerialT m1) (WSerialT m2) = WSerialT $ K.interleaveFst m1 m2
{-# INLINE wSerialMin #-}
wSerialMin :: WSerialT m a -> WSerialT m a -> WSerialT m a
wSerialMin (WSerialT m1) (WSerialT m2) = WSerialT $ K.interleaveMin m1 m2
instance Semigroup (WSerialT m a) where
(<>) = wSerial
------------------------------------------------------------------------------
-- Monoid
------------------------------------------------------------------------------
instance Monoid (WSerialT m a) where
mempty = WSerialT K.nil
mappend = (<>)
{-# INLINE apWSerial #-}
apWSerial :: Monad m => WSerialT m (a -> b) -> WSerialT m a -> WSerialT m b
apWSerial (WSerialT m1) (WSerialT m2) =
let f x1 = K.concatMapWith K.interleave (pure . x1) m2
in WSerialT $ K.concatMapWith K.interleave f m1
instance Monad m => Applicative (WSerialT m) where
{-# INLINE pure #-}
pure = WSerialT . K.fromPure
{-# INLINE (<*>) #-}
(<*>) = apWSerial
------------------------------------------------------------------------------
-- Monad
------------------------------------------------------------------------------
instance Monad m => Monad (WSerialT m) where
return = pure
{-# INLINE (>>=) #-}
(>>=) (WSerialT m) f = WSerialT $ K.bindWith K.interleave m (getWSerialT . f)
------------------------------------------------------------------------------
-- Other instances
------------------------------------------------------------------------------
MONAD_COMMON_INSTANCES(WSerialT,)
LIST_INSTANCES(WSerialT)
NFDATA1_INSTANCE(WSerialT)
FOLDABLE_INSTANCE(WSerialT)
TRAVERSABLE_INSTANCE(WSerialT)

View File

@ -1,205 +0,0 @@
{-# LANGUAGE UndecidableInstances #-}
-- |
-- Module : Streamly.Internal.Data.Stream.WSerial
-- Copyright : (c) 2017 Composewell Technologies
--
-- License : BSD3
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
--
-- To run examples in this module:
--
-- >>> import qualified Streamly.Prelude as Stream
--
module Streamly.Internal.Data.Stream.WSerial
(
-- * Serial interleaving stream
WSerialT(..)
, WSerial
, wSerial
, wSerialFst
, wSerialMin
, consMWSerial
)
where
import Control.Applicative (liftA2)
import Control.DeepSeq (NFData(..))
import Control.DeepSeq (NFData1(..))
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
import Control.Monad.Catch (MonadThrow, throwM)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.Foldable (Foldable(foldl'), fold)
import Data.Functor.Identity (Identity(..), runIdentity)
import Data.Maybe (fromMaybe)
import Data.Semigroup (Endo(..))
import GHC.Exts (IsList(..), IsString(..), oneShot)
import Text.Read
( Lexeme(Ident), lexP, parens, prec, readPrec, readListPrec
, readListPrecDefault)
import Streamly.Internal.BaseCompat ((#.))
import Streamly.Internal.Data.Maybe.Strict (Maybe'(..), toMaybe)
import Streamly.Internal.Data.Stream.StreamK.Type (Stream)
import qualified Streamly.Internal.Data.Stream.Common as P
import qualified Streamly.Internal.Data.Stream.StreamD as D
(fromStreamK, toStreamK, mapM)
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
(cons, consM, nil, concatMapWith, fromPure, bindWith
, withLocal, interleave, interleaveFst, interleaveMin)
import Prelude hiding (map, mapM, repeat, filter)
#include "Instances.hs"
#include "inline.hs"
-- $setup
-- >>> import qualified Streamly.Prelude as Stream
------------------------------------------------------------------------------
-- WSerialT
------------------------------------------------------------------------------
-- | For 'WSerialT' streams:
--
-- @
-- (<>) = 'Streamly.Prelude.wSerial' -- 'Semigroup'
-- (>>=) = flip . 'Streamly.Prelude.concatMapWith' 'Streamly.Prelude.wSerial' -- 'Monad'
-- @
--
-- Note that '<>' is associative only if we disregard the ordering of elements
-- in the resulting stream.
--
-- A single 'Monad' bind behaves like a @for@ loop:
--
-- >>> :{
-- Stream.toList $ Stream.fromWSerial $ do
-- x <- Stream.fromList [1,2] -- foreach x in stream
-- return x
-- :}
-- [1,2]
--
-- Nested monad binds behave like interleaved nested @for@ loops:
--
-- >>> :{
-- Stream.toList $ Stream.fromWSerial $ do
-- x <- Stream.fromList [1,2] -- foreach x in stream
-- y <- Stream.fromList [3,4] -- foreach y in stream
-- return (x, y)
-- :}
-- [(1,3),(2,3),(1,4),(2,4)]
--
-- It is a result of interleaving all the nested iterations corresponding to
-- element @1@ in the first stream with all the nested iterations of element
-- @2@:
--
-- >>> import Streamly.Prelude (wSerial)
-- >>> Stream.toList $ Stream.fromList [(1,3),(1,4)] `Stream.wSerial` Stream.fromList [(2,3),(2,4)]
-- [(1,3),(2,3),(1,4),(2,4)]
--
-- The @W@ in the name stands for @wide@ or breadth wise scheduling in
-- contrast to the depth wise scheduling behavior of 'Stream'.
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
newtype WSerialT m a = WSerialT {getWSerialT :: Stream m a}
deriving (MonadTrans)
-- | An interleaving serial IO stream of elements of type @a@. See 'WSerialT'
-- documentation for more details.
--
-- /Since: 0.2.0 ("Streamly")/
--
-- @since 0.8.0
type WSerial = WSerialT IO
{-# INLINE consMWSerial #-}
{-# SPECIALIZE consMWSerial :: IO a -> WSerialT IO a -> WSerialT IO a #-}
consMWSerial :: Monad m => m a -> WSerialT m a -> WSerialT m a
consMWSerial m (WSerialT ms) = WSerialT $ K.consM m ms
------------------------------------------------------------------------------
-- Semigroup
------------------------------------------------------------------------------
infixr 6 `wSerial`
-- | Interleaves two streams, yielding one element from each stream
-- alternately. When one stream stops the rest of the other stream is used in
-- the output stream.
--
-- This gives exponential priority to earlier streams than the ones joining
-- later. Because of exponential weighting it can be used with 'concatMapWith'.
--
-- /Not fused/
-- NOTE:
--
-- Note that evaluation of @a \`wSerial` b \`wSerial` c@ does not interleave
-- @a@, @b@ and @c@ with equal priority. This expression is equivalent to @a
-- \`wSerial` (b \`wSerial` c)@, therefore, it fairly interleaves @a@ with the
-- result of @b \`wSerial` c@. For example, @Stream.fromList [1,2] \`wSerial`
-- Stream.fromList [3,4] \`wSerial` Stream.fromList [5,6]@ would result in
-- [1,3,2,5,4,6]. In other words, the leftmost stream gets the same scheduling
-- priority as the rest of the streams taken together. The same is true for
-- each subexpression on the right.
--
{-# INLINE wSerial #-}
wSerial :: WSerialT m a -> WSerialT m a -> WSerialT m a
wSerial (WSerialT m1) (WSerialT m2) = WSerialT $ K.interleave m1 m2
{-# INLINE wSerialFst #-}
wSerialFst :: WSerialT m a -> WSerialT m a -> WSerialT m a
wSerialFst (WSerialT m1) (WSerialT m2) = WSerialT $ K.interleaveFst m1 m2
{-# INLINE wSerialMin #-}
wSerialMin :: WSerialT m a -> WSerialT m a -> WSerialT m a
wSerialMin (WSerialT m1) (WSerialT m2) = WSerialT $ K.interleaveMin m1 m2
instance Semigroup (WSerialT m a) where
(<>) = wSerial
------------------------------------------------------------------------------
-- Monoid
------------------------------------------------------------------------------
instance Monoid (WSerialT m a) where
mempty = WSerialT K.nil
mappend = (<>)
{-# INLINE apWSerial #-}
apWSerial :: Monad m => WSerialT m (a -> b) -> WSerialT m a -> WSerialT m b
apWSerial (WSerialT m1) (WSerialT m2) =
let f x1 = K.concatMapWith K.interleave (pure . x1) m2
in WSerialT $ K.concatMapWith K.interleave f m1
instance Monad m => Applicative (WSerialT m) where
{-# INLINE pure #-}
pure = WSerialT . K.fromPure
{-# INLINE (<*>) #-}
(<*>) = apWSerial
------------------------------------------------------------------------------
-- Monad
------------------------------------------------------------------------------
instance Monad m => Monad (WSerialT m) where
return = pure
{-# INLINE (>>=) #-}
(>>=) (WSerialT m) f = WSerialT $ K.bindWith K.interleave m (getWSerialT . f)
------------------------------------------------------------------------------
-- Other instances
------------------------------------------------------------------------------
MONAD_COMMON_INSTANCES(WSerialT,)
LIST_INSTANCES(WSerialT)
NFDATA1_INSTANCE(WSerialT)
FOLDABLE_INSTANCE(WSerialT)
TRAVERSABLE_INSTANCE(WSerialT)

View File

@ -344,7 +344,6 @@ library
, Streamly.Internal.Data.Stream.SVar.Eliminate
, Streamly.Internal.Data.Stream.Serial
, Streamly.Internal.Data.Stream.WSerial
, Streamly.Internal.Data.Stream.Async
, Streamly.Internal.Data.Stream.Parallel
, Streamly.Internal.Data.Stream.Channel.Types

View File

@ -15,7 +15,7 @@ import Test.Hspec.QuickCheck
import Test.QuickCheck.Monadic (monadicIO, run)
import Test.Hspec as H
import qualified Streamly.Internal.Data.Stream.WSerial as WSerial
import qualified Streamly.Internal.Data.Stream.Serial as Serial
import Streamly.Prelude hiding (repeat)
import qualified Streamly.Prelude as S
@ -64,7 +64,7 @@ wSerialMinLengthProp =
finiteStream len = S.take len $ S.repeat (1 :: Int)
infiniteStream = S.repeat 1
combined len = infiniteStream `WSerial.wSerialMin` finiteStream len
combined len = infiniteStream `Serial.wSerialMin` finiteStream len
moduleName :: String
moduleName = "Prelude.WSerial"