mirror of
https://github.com/ilyakooo0/streamly.git
synced 2024-09-17 11:37:20 +03:00
Optimize Functor/Applicative/Monad etc for serial
Some of the benchmarks were order of magnitude off due to missing INLINE for type class operations. Now, all of them are in reasonable limits. Benchmarks affected for serial streams: * Functor, Applicative, Monad, transformers We need to do a similar exercise for other types of streams and for folds/parsers as well.
This commit is contained in:
parent
b4cef11d19
commit
55d49bd50c
@ -27,6 +27,7 @@ import Control.Monad (when)
|
||||
import Control.Monad.IO.Class (MonadIO(..))
|
||||
import Control.Monad.State.Strict (StateT, get, put, MonadState)
|
||||
import qualified Control.Monad.State.Strict as State
|
||||
import Control.Monad.Trans.Class (lift)
|
||||
import Data.Functor.Identity (Identity, runIdentity)
|
||||
import Data.IORef (newIORef, modifyIORef')
|
||||
import GHC.Generics (Generic)
|
||||
@ -1610,8 +1611,11 @@ o_1_space_concat value = sqrtVal `seq`
|
||||
|
||||
o_1_space_applicative :: Int -> [Benchmark]
|
||||
o_1_space_applicative value =
|
||||
[ bgroup "applicative"
|
||||
[ benchIO "outer product (sqrt n x sqrt n)" $ toNullAp value serially
|
||||
[ bgroup "Applicative"
|
||||
[ benchIO "(*>) (sqrt n x sqrt n)" $ apDiscardFst value serially
|
||||
, benchIO "(<*) (sqrt n x sqrt n)" $ apDiscardSnd value serially
|
||||
, benchIO "(<*>) (sqrt n x sqrt n)" $ toNullAp value serially
|
||||
, benchIO "liftA2 (sqrt n x sqrt n)" $ apLiftA2 value serially
|
||||
]
|
||||
]
|
||||
|
||||
@ -1636,16 +1640,17 @@ o_n_space_applicative value =
|
||||
o_1_space_monad :: Int -> [Benchmark]
|
||||
o_1_space_monad value =
|
||||
[ bgroup "Monad"
|
||||
[ benchIO "outer product (sqrt n x sqrt n)" $ toNullM value serially
|
||||
, benchIO "outer product (sqrt n x sqrt n) (filterAllOut)" $
|
||||
[ benchIO "(>>) (sqrt n x sqrt n)" $ monadThen value serially
|
||||
, benchIO "(>>=) (sqrt n x sqrt n)" $ toNullM value serially
|
||||
, benchIO "(>>=) (sqrt n x sqrt n) (filterAllOut)" $
|
||||
filterAllOutM value serially
|
||||
, benchIO "outer product (sqrt n x sqrt n) (filterAllIn)" $
|
||||
, benchIO "(>>=) (sqrt n x sqrt n) (filterAllIn)" $
|
||||
filterAllInM value serially
|
||||
, benchIO "outer product (sqrt n x sqrt n) (filterSome)" $
|
||||
, benchIO "(>>=) (sqrt n x sqrt n) (filterSome)" $
|
||||
filterSome value serially
|
||||
, benchIO "outer product (sqrt n x sqrt n) (breakAfterSome)" $
|
||||
, benchIO "(>>=) (sqrt n x sqrt n) (breakAfterSome)" $
|
||||
breakAfterSome value serially
|
||||
, benchIO "outer product (cubert n x cubert n x cubert n)" $
|
||||
, benchIO "(>>=) (cubert n x cubert n x cubert n)" $
|
||||
toNullM3 value serially
|
||||
]
|
||||
]
|
||||
@ -1657,9 +1662,9 @@ o_n_space_monad value =
|
||||
iterateSingleton ((>>) . pure) value
|
||||
, benchIOSrc serially "(>>=) (n times)" $
|
||||
iterateSingleton (\x xs -> xs >>= \y -> return (x + y)) value
|
||||
, benchIO "outer product (sqrt n x sqrt n) (toList)" $
|
||||
, benchIO "(>>=) (sqrt n x sqrt n) (toList)" $
|
||||
toListM value serially
|
||||
, benchIO "outer product (sqrt n x sqrt n) (toListSome)" $
|
||||
, benchIO "(>>=) (sqrt n x sqrt n) (toListSome)" $
|
||||
toListSome value serially
|
||||
]
|
||||
]
|
||||
@ -1714,10 +1719,10 @@ iterateStateIO n = do
|
||||
{-# INLINE iterateStateT #-}
|
||||
iterateStateT :: Int -> SerialT (StateT Int IO) Int
|
||||
iterateStateT n = do
|
||||
x <- get
|
||||
x <- lift get
|
||||
if x > n
|
||||
then do
|
||||
put (x - 1)
|
||||
lift $ put (x - 1)
|
||||
iterateStateT n
|
||||
else return x
|
||||
|
||||
@ -1734,8 +1739,8 @@ iterateState n = do
|
||||
iterateState n
|
||||
else return x
|
||||
|
||||
o_n_space_transformer :: Int -> [Benchmark]
|
||||
o_n_space_transformer value =
|
||||
o_n_heap_transformer :: Int -> [Benchmark]
|
||||
o_n_heap_transformer value =
|
||||
[ bgroup "transformer"
|
||||
[ benchIO "StateT Int IO (n times) (baseline)" $ \n ->
|
||||
State.evalStateT (iterateStateIO n) value
|
||||
@ -1805,6 +1810,7 @@ main = do
|
||||
|
||||
-- transformation
|
||||
, o_n_heap_reordering size
|
||||
, o_n_heap_transformer size
|
||||
]
|
||||
, bgroup (o_n_space_prefix moduleName) $ Prelude.concat
|
||||
[ o_n_space_elimination_foldable size
|
||||
@ -1819,6 +1825,5 @@ main = do
|
||||
, o_n_space_functor size
|
||||
, o_n_space_applicative size
|
||||
, o_n_space_monad size
|
||||
, o_n_space_transformer size
|
||||
]
|
||||
]
|
||||
|
@ -12,6 +12,7 @@
|
||||
|
||||
module Streamly.Benchmark.Prelude where
|
||||
|
||||
import Control.Applicative (liftA2)
|
||||
import Control.DeepSeq (NFData(..))
|
||||
import Control.Exception (try)
|
||||
import Data.Functor.Identity (Identity)
|
||||
@ -345,6 +346,42 @@ concatStreamsWith op outer inner n =
|
||||
runToList :: Monad m => S.SerialT m a -> m [a]
|
||||
runToList = S.toList
|
||||
|
||||
{-# INLINE apDiscardFst #-}
|
||||
apDiscardFst
|
||||
:: (S.IsStream t, S.MonadAsync m, Applicative (t m))
|
||||
=> Int -> (t m Int -> S.SerialT m Int) -> Int -> m ()
|
||||
apDiscardFst linearCount t start = S.drain . t $
|
||||
S.serially (sourceUnfoldrM nestedCount2 start)
|
||||
*> S.serially (sourceUnfoldrM nestedCount2 start)
|
||||
|
||||
where
|
||||
|
||||
nestedCount2 = round (fromIntegral linearCount**(1/2::Double))
|
||||
|
||||
{-# INLINE apDiscardSnd #-}
|
||||
apDiscardSnd
|
||||
:: (S.IsStream t, S.MonadAsync m, Applicative (t m))
|
||||
=> Int -> (t m Int -> S.SerialT m Int) -> Int -> m ()
|
||||
apDiscardSnd linearCount t start = S.drain . t $
|
||||
S.serially (sourceUnfoldrM nestedCount2 start)
|
||||
<* S.serially (sourceUnfoldrM nestedCount2 start)
|
||||
|
||||
where
|
||||
|
||||
nestedCount2 = round (fromIntegral linearCount**(1/2::Double))
|
||||
|
||||
{-# INLINE apLiftA2 #-}
|
||||
apLiftA2
|
||||
:: (S.IsStream t, S.MonadAsync m, Applicative (t m))
|
||||
=> Int -> (t m Int -> S.SerialT m Int) -> Int -> m ()
|
||||
apLiftA2 linearCount t start = S.drain . t $
|
||||
liftA2 (+) (S.serially (sourceUnfoldrM nestedCount2 start))
|
||||
(S.serially (sourceUnfoldrM nestedCount2 start))
|
||||
|
||||
where
|
||||
|
||||
nestedCount2 = round (fromIntegral linearCount**(1/2::Double))
|
||||
|
||||
{-# INLINE toNullAp #-}
|
||||
toNullAp
|
||||
:: (S.IsStream t, S.MonadAsync m, Applicative (t m))
|
||||
@ -357,6 +394,18 @@ toNullAp linearCount t start = S.drain . t $
|
||||
|
||||
nestedCount2 = round (fromIntegral linearCount**(1/2::Double))
|
||||
|
||||
{-# INLINE monadThen #-}
|
||||
monadThen
|
||||
:: (S.IsStream t, S.MonadAsync m, Monad (t m))
|
||||
=> Int -> (t m Int -> S.SerialT m Int) -> Int -> m ()
|
||||
monadThen linearCount t start = S.drain . t $ do
|
||||
(S.serially $ sourceUnfoldrM nestedCount2 start) >>
|
||||
(S.serially $ sourceUnfoldrM nestedCount2 start)
|
||||
|
||||
where
|
||||
|
||||
nestedCount2 = round (fromIntegral linearCount**(1/2::Double))
|
||||
|
||||
{-# INLINE toNullM #-}
|
||||
toNullM
|
||||
:: (S.IsStream t, S.MonadAsync m, Monad (t m))
|
||||
|
@ -108,12 +108,10 @@ common bench-depends
|
||||
, streamly >= 0.7.0
|
||||
, random >= 1.0 && < 2.0
|
||||
, gauge >= 0.2.4 && < 0.3
|
||||
, transformers >= 0.4 && < 0.6
|
||||
if flag(fusion-plugin) && !impl(ghcjs) && !impl(ghc < 8.6)
|
||||
build-depends:
|
||||
fusion-plugin >= 0.2 && < 0.3
|
||||
if impl(ghc < 8.0)
|
||||
build-depends:
|
||||
transformers >= 0.4 && < 0.6
|
||||
if flag(inspection)
|
||||
build-depends: template-haskell >= 2.14 && < 2.17
|
||||
, inspection-testing >= 0.4 && < 0.5
|
||||
|
@ -5,6 +5,19 @@
|
||||
# RTS_OPTIONS: additional RTS options
|
||||
# QUICK_MODE: whether we are in quick mode
|
||||
|
||||
# $1: message
|
||||
die () {
|
||||
>&2 echo -e "Error: $1"
|
||||
exit 1
|
||||
}
|
||||
|
||||
warn () {
|
||||
>&2 echo -e "Warning: $1"
|
||||
}
|
||||
|
||||
test -n "$BENCH_EXEC_PATH" || die "BENCH_EXEC_PATH env var must be set"
|
||||
test -n "$QUICK_MODE" || warn "QUICK_MODE env var not set (to 0 or 1)"
|
||||
|
||||
#------------------------------------------------------------------------------
|
||||
# RTS Options
|
||||
#------------------------------------------------------------------------------
|
||||
@ -35,10 +48,10 @@ bench_rts_opts_specific () {
|
||||
Prelude.Parallel/o-n-heap/monad-outer-product/*) echo -n "-M256M" ;;
|
||||
Prelude.Parallel/o-n-space/monad-outer-product/*) echo -n "-K4M -M256M" ;;
|
||||
|
||||
Prelude.Serial/o-n-space/Functor/*) echo -n "-K4M -M256M" ;;
|
||||
Prelude.Serial/o-n-space/Applicative/*) echo -n "-K8M -M256M" ;;
|
||||
Prelude.Serial/o-n-space/Monad/*) echo -n "-K8M -M256M" ;;
|
||||
Prelude.Serial/o-n-space/transformer/*) echo -n "-K8M -M256M" ;;
|
||||
Prelude.Serial/o-n-space/Functor/*) echo -n "-K4M -M64M" ;;
|
||||
Prelude.Serial/o-n-space/Applicative/*) echo -n "-K8M -M128M" ;;
|
||||
Prelude.Serial/o-n-space/Monad/*) echo -n "-K8M -M64M" ;;
|
||||
Prelude.Serial/o-n-heap/transformer/*) echo -n "-M64M" ;;
|
||||
Prelude.Serial/o-n-space/grouping/*) echo -n "" ;;
|
||||
Prelude.Serial/o-n-space/*) echo -n "-K4M" ;;
|
||||
|
||||
@ -65,9 +78,12 @@ bench_rts_opts_specific () {
|
||||
# Speed options
|
||||
#------------------------------------------------------------------------------
|
||||
|
||||
if test "$QUICK_MODE" -eq 0
|
||||
if test -n "$QUICK_MODE"
|
||||
then
|
||||
QUICK_OPTIONS="--min-samples 3"
|
||||
if test "$QUICK_MODE" -eq 0
|
||||
then
|
||||
QUICK_OPTIONS="--min-samples 3"
|
||||
fi
|
||||
fi
|
||||
SUPER_QUICK_OPTIONS="--min-samples 1 --include-first-iter"
|
||||
|
||||
|
@ -7,10 +7,12 @@
|
||||
|
||||
#define MONADPARALLEL , MonadAsync m
|
||||
|
||||
#define MONAD_COMMON_INSTANCES(STREAM,CONSTRAINT) \
|
||||
instance Monad m => Functor (STREAM m) where { \
|
||||
{-# INLINE fmap #-}; \
|
||||
fmap f (STREAM m) = D.fromStreamD $ D.mapM (return . f) $ D.toStreamD m }; \
|
||||
#define MONAD_COMMON_INSTANCES(STREAM,CONSTRAINT) \
|
||||
instance Monad m => Functor (STREAM m) where { \
|
||||
{-# INLINE fmap #-}; \
|
||||
fmap f (STREAM m) = D.fromStreamD $ D.mapM (return . f) $ D.toStreamD m; \
|
||||
{-# INLINE (<$) #-}; \
|
||||
(<$) = fmap . const }; \
|
||||
\
|
||||
instance (MonadBase b m, Monad m CONSTRAINT) => MonadBase b (STREAM m) where {\
|
||||
liftBase = liftBaseDefault }; \
|
||||
@ -33,8 +35,11 @@ instance (MonadReader r m CONSTRAINT) => MonadReader r (STREAM m) where { \
|
||||
local f m = fromStream $ K.withLocal f (toStream m) }; \
|
||||
\
|
||||
instance (MonadState s m CONSTRAINT) => MonadState s (STREAM m) where { \
|
||||
{-# INLINE get #-}; \
|
||||
get = lift get; \
|
||||
{-# INLINE put #-}; \
|
||||
put x = lift (put x); \
|
||||
{-# INLINE state #-}; \
|
||||
state k = lift (state k) }
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
|
@ -146,6 +146,7 @@ import qualified Streamly.Internal.Data.Stream.StreamD as D
|
||||
--
|
||||
-- @since 0.2.0
|
||||
newtype SerialT m a = SerialT {getSerialT :: Stream m a}
|
||||
-- XXX when deriving do we inherit an INLINE?
|
||||
deriving (Semigroup, Monoid, MonadTrans)
|
||||
|
||||
-- | A serial IO stream of elements of type @a@. See 'SerialT' documentation
|
||||
@ -182,8 +183,10 @@ instance IsStream SerialT where
|
||||
|
||||
instance Monad m => Monad (SerialT m) where
|
||||
return = pure
|
||||
|
||||
{-# INLINE (>>=) #-}
|
||||
(>>=) = K.bindWith K.serial
|
||||
|
||||
{-# INLINE (>>) #-}
|
||||
(>>) = (*>)
|
||||
|
||||
@ -218,19 +221,41 @@ map f = mapM (return . f)
|
||||
|
||||
{-# INLINE apSerial #-}
|
||||
apSerial :: Monad m => SerialT m (a -> b) -> SerialT m a -> SerialT m b
|
||||
apSerial (SerialT m1) (SerialT m2) = D.fromStreamD $ D.toStreamD m1 <*> D.toStreamD m2
|
||||
apSerial (SerialT m1) (SerialT m2) =
|
||||
D.fromStreamD $ D.toStreamD m1 <*> D.toStreamD m2
|
||||
|
||||
{-# INLINE apSequence #-}
|
||||
apSequence :: Monad m => SerialT m a -> SerialT m b -> SerialT m b
|
||||
apSequence (SerialT m1) (SerialT m2) = D.fromStreamD $ D.toStreamD m1 *> D.toStreamD m2
|
||||
apSequence (SerialT m1) (SerialT m2) =
|
||||
D.fromStreamD $ D.toStreamD m1 *> D.toStreamD m2
|
||||
|
||||
{-# INLINE apDiscardSnd #-}
|
||||
apDiscardSnd :: Monad m => SerialT m a -> SerialT m b -> SerialT m a
|
||||
apDiscardSnd (SerialT m1) (SerialT m2) =
|
||||
D.fromStreamD $ D.toStreamD m1 <* D.toStreamD m2
|
||||
|
||||
-- Note: we need to define all the typeclass operations because we want to
|
||||
-- INLINE them.
|
||||
instance Monad m => Applicative (SerialT m) where
|
||||
{-# INLINE pure #-}
|
||||
pure = SerialT . K.yield
|
||||
|
||||
{-# INLINE (<*>) #-}
|
||||
(<*>) = apSerial
|
||||
-- (<*>) = K.apSerial
|
||||
|
||||
#if MIN_VERSION_base(4,10,0)
|
||||
{-# INLINE liftA2 #-}
|
||||
liftA2 f x = (<*>) (fmap f x)
|
||||
#endif
|
||||
|
||||
{-# INLINE (*>) #-}
|
||||
(*>) = apSequence
|
||||
-- (*>) = K.apSerialDiscardFst
|
||||
|
||||
{-# INLINE (<*) #-}
|
||||
(<*) = apDiscardSnd
|
||||
-- (<*) = K.apSerialDiscardSnd
|
||||
|
||||
MONAD_COMMON_INSTANCES(SerialT,)
|
||||
LIST_INSTANCES(SerialT)
|
||||
|
@ -182,6 +182,9 @@ instance Functor m => Functor (Stream m) where
|
||||
{-# INLINE_LATE step' #-}
|
||||
step' gst st = fmap (fmap f) (step (adaptState gst) st)
|
||||
|
||||
{-# INLINE (<$) #-}
|
||||
(<$) = fmap . const
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
-- concatMap
|
||||
------------------------------------------------------------------------------
|
||||
@ -260,8 +263,11 @@ concatAp (Stream stepa statea) (Stream stepb stateb) = Stream step' (Left statea
|
||||
|
||||
{-# INLINE_NORMAL apSequence #-}
|
||||
apSequence :: Functor f => Stream f a -> Stream f b -> Stream f b
|
||||
apSequence (Stream stepa statea) (Stream stepb stateb) = Stream step (Left statea)
|
||||
where
|
||||
apSequence (Stream stepa statea) (Stream stepb stateb) =
|
||||
Stream step (Left statea)
|
||||
|
||||
where
|
||||
|
||||
{-# INLINE_LATE step #-}
|
||||
step gst (Left st) =
|
||||
fmap
|
||||
@ -280,26 +286,63 @@ apSequence (Stream stepa statea) (Stream stepb stateb) = Stream step (Left state
|
||||
Stop -> Skip (Left ostate))
|
||||
(stepb gst st)
|
||||
|
||||
{-# INLINE_NORMAL apDiscardSnd #-}
|
||||
apDiscardSnd :: Functor f => Stream f a -> Stream f b -> Stream f a
|
||||
apDiscardSnd (Stream stepa statea) (Stream stepb stateb) =
|
||||
Stream step (Left statea)
|
||||
|
||||
where
|
||||
|
||||
{-# INLINE_LATE step #-}
|
||||
step gst (Left st) =
|
||||
fmap
|
||||
(\r ->
|
||||
case r of
|
||||
Yield b s -> Skip (Right (s, stateb, b))
|
||||
Skip s -> Skip (Left s)
|
||||
Stop -> Stop)
|
||||
(stepa gst st)
|
||||
step gst (Right (ostate, st, b)) =
|
||||
fmap
|
||||
(\r ->
|
||||
case r of
|
||||
Yield _ s -> Yield b (Right (ostate, s, b))
|
||||
Skip s -> Skip (Right (ostate, s, b))
|
||||
Stop -> Skip (Left ostate))
|
||||
(stepb (adaptState gst) st)
|
||||
|
||||
instance Applicative f => Applicative (Stream f) where
|
||||
{-# INLINE pure #-}
|
||||
pure = yield
|
||||
|
||||
{-# INLINE (<*>) #-}
|
||||
(<*>) = concatAp
|
||||
|
||||
#if MIN_VERSION_base(4,10,0)
|
||||
{-# INLINE liftA2 #-}
|
||||
liftA2 f x = (<*>) (fmap f x)
|
||||
#endif
|
||||
|
||||
{-# INLINE (*>) #-}
|
||||
(*>) = apSequence
|
||||
|
||||
{-# INLINE (<*) #-}
|
||||
(<*) = apDiscardSnd
|
||||
|
||||
-- NOTE: even though concatMap for StreamD is 4x faster compared to StreamK,
|
||||
-- the monad instance does not seem to be significantly faster.
|
||||
instance Monad m => Monad (Stream m) where
|
||||
{-# INLINE return #-}
|
||||
return = pure
|
||||
|
||||
{-# INLINE (>>=) #-}
|
||||
(>>=) = flip concatMap
|
||||
|
||||
{-# INLINE (>>) #-}
|
||||
(>>) = (*>)
|
||||
|
||||
instance MonadTrans Stream where
|
||||
{-# INLINE lift #-}
|
||||
lift = yieldM
|
||||
|
||||
instance (MonadThrow m) => MonadThrow (Stream m) where
|
||||
|
@ -171,6 +171,10 @@ module Streamly.Internal.Data.Stream.StreamK
|
||||
, concatMapBy
|
||||
, concatMap
|
||||
, bindWith
|
||||
, apWith
|
||||
, apSerial
|
||||
, apSerialDiscardFst
|
||||
, apSerialDiscardSnd
|
||||
|
||||
-- ** Transformation comprehensions
|
||||
, the
|
||||
|
@ -80,6 +80,10 @@ module Streamly.Internal.Data.Stream.StreamK.Type
|
||||
, concatMapBy
|
||||
, concatMap
|
||||
, bindWith
|
||||
, apWith
|
||||
, apSerial
|
||||
, apSerialDiscardFst
|
||||
, apSerialDiscardSnd
|
||||
|
||||
, Streaming -- deprecated
|
||||
)
|
||||
@ -886,6 +890,7 @@ instance Monad m => Functor (Stream m) where
|
||||
-------------------------------------------------------------------------------
|
||||
|
||||
instance MonadTrans Stream where
|
||||
{-# INLINE lift #-}
|
||||
lift = yieldM
|
||||
|
||||
-------------------------------------------------------------------------------
|
||||
@ -898,6 +903,118 @@ unShare :: IsStream t => t m a -> t m a
|
||||
unShare x = mkStream $ \st yld sng stp ->
|
||||
foldStream st yld sng stp x
|
||||
|
||||
-- XXX the function stream and value stream can run in parallel
|
||||
{-# INLINE apWith #-}
|
||||
apWith
|
||||
:: IsStream t
|
||||
=> (t m b -> t m b -> t m b)
|
||||
-> t m (a -> b)
|
||||
-> t m a
|
||||
-> t m b
|
||||
apWith par fstream stream = go1 fstream
|
||||
|
||||
where
|
||||
|
||||
go1 m =
|
||||
mkStream $ \st yld sng stp ->
|
||||
let foldShared = foldStreamShared st yld sng stp
|
||||
single f = foldShared $ unShare (go2 f stream)
|
||||
yieldk f r = foldShared $ unShare (go2 f stream) `par` go1 r
|
||||
in foldStream (adaptState st) yieldk single stp m
|
||||
|
||||
go2 f m =
|
||||
mkStream $ \st yld sng stp ->
|
||||
let single a = sng (f a)
|
||||
yieldk a r = yld (f a) (go2 f r)
|
||||
in foldStream (adaptState st) yieldk single stp m
|
||||
|
||||
{-# INLINE apSerial #-}
|
||||
apSerial
|
||||
:: IsStream t
|
||||
=> t m (a -> b)
|
||||
-> t m a
|
||||
-> t m b
|
||||
apSerial fstream stream = go1 fstream
|
||||
|
||||
where
|
||||
|
||||
go1 m =
|
||||
mkStream $ \st yld sng stp ->
|
||||
let foldShared = foldStreamShared st yld sng stp
|
||||
single f = foldShared $ go3 f stream
|
||||
yieldk f r = foldShared $ go2 f r stream
|
||||
in foldStream (adaptState st) yieldk single stp m
|
||||
|
||||
go2 f r1 m =
|
||||
mkStream $ \st yld sng stp ->
|
||||
let foldShared = foldStreamShared st yld sng stp
|
||||
stop = foldShared $ go1 r1
|
||||
single a = yld (f a) (go1 r1)
|
||||
yieldk a r = yld (f a) (go2 f r1 r)
|
||||
in foldStream (adaptState st) yieldk single stop m
|
||||
|
||||
go3 f m =
|
||||
mkStream $ \st yld sng stp ->
|
||||
let single a = sng (f a)
|
||||
yieldk a r = yld (f a) (go3 f r)
|
||||
in foldStream (adaptState st) yieldk single stp m
|
||||
|
||||
{-# INLINE apSerialDiscardFst #-}
|
||||
apSerialDiscardFst
|
||||
:: IsStream t
|
||||
=> t m a
|
||||
-> t m b
|
||||
-> t m b
|
||||
apSerialDiscardFst fstream stream = go1 fstream
|
||||
|
||||
where
|
||||
|
||||
go1 m =
|
||||
mkStream $ \st yld sng stp ->
|
||||
let foldShared = foldStreamShared st yld sng stp
|
||||
single _ = foldShared $ stream
|
||||
yieldk _ r = foldShared $ go2 r stream
|
||||
in foldStream (adaptState st) yieldk single stp m
|
||||
|
||||
go2 r1 m =
|
||||
mkStream $ \st yld sng stp ->
|
||||
let foldShared = foldStreamShared st yld sng stp
|
||||
stop = foldShared $ go1 r1
|
||||
single a = yld a (go1 r1)
|
||||
yieldk a r = yld a (go2 r1 r)
|
||||
in foldStream st yieldk single stop m
|
||||
|
||||
{-# INLINE apSerialDiscardSnd #-}
|
||||
apSerialDiscardSnd
|
||||
:: IsStream t
|
||||
=> t m a
|
||||
-> t m b
|
||||
-> t m a
|
||||
apSerialDiscardSnd fstream stream = go1 fstream
|
||||
|
||||
where
|
||||
|
||||
go1 m =
|
||||
mkStream $ \st yld sng stp ->
|
||||
let foldShared = foldStreamShared st yld sng stp
|
||||
single f = foldShared $ go3 f stream
|
||||
yieldk f r = foldShared $ go2 f r stream
|
||||
in foldStream st yieldk single stp m
|
||||
|
||||
go2 f r1 m =
|
||||
mkStream $ \st yld sng stp ->
|
||||
let foldShared = foldStreamShared st yld sng stp
|
||||
stop = foldShared $ go1 r1
|
||||
single _ = yld f (go1 r1)
|
||||
yieldk _ r = yld f (go2 f r1 r)
|
||||
in foldStream (adaptState st) yieldk single stop m
|
||||
|
||||
go3 f m =
|
||||
mkStream $ \st yld sng stp ->
|
||||
let single _ = sng f
|
||||
yieldk _ r = yld f (go3 f r)
|
||||
in foldStream (adaptState st) yieldk single stp m
|
||||
|
||||
-- XXX This is just concatMapBy with arguments flipped. We need to keep this
|
||||
-- instead of using a concatMap style definition because the bind
|
||||
-- implementation in Async and WAsync streams show significant perf degradation
|
||||
|
@ -115,6 +115,9 @@ module Streamly.Internal.Data.Unfold
|
||||
, concat
|
||||
, concatMapM
|
||||
, outerProduct
|
||||
, ap
|
||||
, apDiscardFst
|
||||
, apDiscardSnd
|
||||
|
||||
-- * Exceptions
|
||||
, gbracket
|
||||
@ -689,6 +692,34 @@ outerProduct (Unfold step1 inject1) (Unfold step2 inject2) = Unfold step inject
|
||||
Skip s -> Skip (OuterProductInner ost sy s x)
|
||||
Stop -> Skip (OuterProductOuter ost sy)
|
||||
|
||||
-- Special cases of outer product
|
||||
-- | Outer product with a function application.
|
||||
--
|
||||
-- /Unimplemented/
|
||||
--
|
||||
{-# INLINE_NORMAL ap #-}
|
||||
ap :: -- Monad m =>
|
||||
Unfold m a (b -> c) -> Unfold m d b -> Unfold m (a, d) c
|
||||
ap (Unfold _step1 _inject1) (Unfold _step2 _inject2) = undefined
|
||||
|
||||
-- | Outer product discarding the first element.
|
||||
--
|
||||
-- /Unimplemented/
|
||||
--
|
||||
{-# INLINE_NORMAL apDiscardFst #-}
|
||||
apDiscardFst :: -- Monad m =>
|
||||
Unfold m a b -> Unfold m c d -> Unfold m (a, c) d
|
||||
apDiscardFst (Unfold _step1 _inject1) (Unfold _step2 _inject2) = undefined
|
||||
|
||||
-- | Outer product discarding the second element.
|
||||
--
|
||||
-- /Unimplemented/
|
||||
--
|
||||
{-# INLINE_NORMAL apDiscardSnd #-}
|
||||
apDiscardSnd :: -- Monad m =>
|
||||
Unfold m a b -> Unfold m c d -> Unfold m (a, c) b
|
||||
apDiscardSnd (Unfold _step1 _inject1) (Unfold _step2 _inject2) = undefined
|
||||
|
||||
-- XXX This can be used to implement a Monad instance for "Unfold m ()".
|
||||
|
||||
data ConcatMapState s1 s2 = ConcatMapOuter s1 | ConcatMapInner s1 s2
|
||||
|
25
test/Prop.hs
25
test/Prop.hs
@ -4,7 +4,7 @@
|
||||
|
||||
module Main (main) where
|
||||
|
||||
import Control.Applicative (ZipList(..))
|
||||
import Control.Applicative (ZipList(..), liftA2)
|
||||
import Control.Concurrent (MVar, takeMVar, putMVar, newEmptyMVar)
|
||||
import Control.Exception
|
||||
(BlockedIndefinitelyOnMVar(..), catches,
|
||||
@ -880,6 +880,28 @@ applicativeOps constr eq t (a, b) = withMaxSuccess maxTestCount $
|
||||
let list = (,) <$> a <*> b
|
||||
listEquals eq stream list
|
||||
|
||||
stream1 <- run ((S.toList . t) (liftA2 (,) (constr a) (constr b)))
|
||||
listEquals eq stream1 list
|
||||
|
||||
-- XXX we can combine this with applicativeOps by making the type sufficiently
|
||||
-- polymorphic.
|
||||
applicativeOps1
|
||||
:: Applicative (t IO)
|
||||
=> ([Int] -> t IO Int)
|
||||
-> ([Int] -> [Int] -> Bool)
|
||||
-> (t IO Int -> SerialT IO Int)
|
||||
-> ([Int], [Int])
|
||||
-> Property
|
||||
applicativeOps1 constr eq t (a, b) = withMaxSuccess maxTestCount $
|
||||
monadicIO $ do
|
||||
stream <- run ((S.toList . t) (constr a *> constr b))
|
||||
let list = a *> b
|
||||
listEquals eq stream list
|
||||
|
||||
stream1 <- run ((S.toList . t) (constr a <* constr b))
|
||||
let list1 = a <* b
|
||||
listEquals eq stream1 list1
|
||||
|
||||
-------------------------------------------------------------------------------
|
||||
-- Zip operations
|
||||
-------------------------------------------------------------------------------
|
||||
@ -1139,6 +1161,7 @@ main = hspec
|
||||
-- XXX applicative with three arguments
|
||||
serialOps $ prop "serially applicative" . applicativeOps S.fromFoldable (==)
|
||||
serialOps $ prop "serially applicative folded" . applicativeOps folded (==)
|
||||
serialOps $ prop "serially applicative discard" . applicativeOps1 S.fromFoldable (==)
|
||||
wSerialOps $ prop "wSerially applicative" . applicativeOps S.fromFoldable sortEq
|
||||
wSerialOps $ prop "wSerially applicative folded" . applicativeOps folded sortEq
|
||||
aheadOps $ prop "aheadly applicative" . applicativeOps S.fromFoldable (==)
|
||||
|
Loading…
Reference in New Issue
Block a user