mirror of
https://github.com/composewell/streamly.git
synced 2024-09-20 16:08:20 +03:00
Add Skip constructor to the core Stream type to enhance stream fusion
Introduce the skip constructor and fix the simpler warnings with some slight refactorings. Add alternate definitions of filtering operations. Revert `all` to the original definition 1. Implement the fromStreamK and zipWithM methods 2. Uncomment the linear benchmarks Cleanup and add some stylistic changes Add alternate implementation of drop function
This commit is contained in:
parent
c7666a7caa
commit
81cf15b9f6
@ -136,11 +136,12 @@ import qualified Streamly.Streams.StreamK as K
|
||||
-- | A stream is a succession of 'Step's. A 'Yield' produces a single value and
|
||||
-- the next state of the stream. 'Stop' indicates there are no more values in
|
||||
-- the stream.
|
||||
data Step s a = Yield a s | Stop
|
||||
data Step s a = Yield a s | Skip s | Stop
|
||||
|
||||
instance Functor (Step s) where
|
||||
{-# INLINE fmap #-}
|
||||
fmap f (Yield x s) = Yield (f x) s
|
||||
fmap _ (Skip s) = Skip s
|
||||
fmap _ Stop = Stop
|
||||
|
||||
-- gst = global state
|
||||
@ -166,9 +167,11 @@ cons x (Stream step state) = Stream step1 Nothing
|
||||
step1 _ Nothing = return $ Yield x (Just state)
|
||||
step1 gst (Just st) = do
|
||||
r <- step (rstState gst) st
|
||||
case r of
|
||||
Yield a s -> return $ Yield a (Just s)
|
||||
Stop -> return Stop
|
||||
return $
|
||||
case r of
|
||||
Yield a s -> Yield a (Just s)
|
||||
Skip s -> Skip (Just s)
|
||||
Stop -> Stop
|
||||
|
||||
-------------------------------------------------------------------------------
|
||||
-- Deconstruction
|
||||
@ -181,9 +184,10 @@ uncons (Stream step state) = go state
|
||||
where
|
||||
go st = do
|
||||
r <- step defState st
|
||||
return $ case r of
|
||||
Yield x s -> Just (x, Stream step s)
|
||||
Stop -> Nothing
|
||||
case r of
|
||||
Yield x s -> return $ Just (x, Stream step s)
|
||||
Skip s -> go s
|
||||
Stop -> return Nothing
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
-- Generation by unfold
|
||||
@ -284,6 +288,7 @@ foldrM f z (Stream step state) = go SPEC state
|
||||
r <- step defState st
|
||||
case r of
|
||||
Yield x s -> go SPEC s >>= f x
|
||||
Skip s -> go SPEC s
|
||||
Stop -> return z
|
||||
|
||||
{-# INLINE_NORMAL foldr #-}
|
||||
@ -300,7 +305,8 @@ foldlM' fstep begin (Stream step state) = go SPEC begin state
|
||||
Yield x s -> do
|
||||
acc' <- fstep acc x
|
||||
go SPEC acc' s
|
||||
Stop -> return acc
|
||||
Skip s -> go SPEC acc s
|
||||
Stop -> return acc
|
||||
|
||||
{-# INLINE foldl' #-}
|
||||
foldl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> m b
|
||||
@ -319,6 +325,7 @@ runStream (Stream step state) = go SPEC state
|
||||
r <- step defState st
|
||||
case r of
|
||||
Yield _ s -> go SPEC s
|
||||
Skip s -> go SPEC s
|
||||
Stop -> return ()
|
||||
|
||||
{-# INLINE_NORMAL null #-}
|
||||
@ -329,7 +336,8 @@ null (Stream step state) = go state
|
||||
r <- step defState st
|
||||
case r of
|
||||
Yield _ _ -> return False
|
||||
Stop -> return True
|
||||
Skip s -> go s
|
||||
Stop -> return True
|
||||
|
||||
-- XXX SPEC?
|
||||
{-# INLINE_NORMAL head #-}
|
||||
@ -340,7 +348,8 @@ head (Stream step state) = go state
|
||||
r <- step defState st
|
||||
case r of
|
||||
Yield x _ -> return (Just x)
|
||||
Stop -> return Nothing
|
||||
Skip s -> go s
|
||||
Stop -> return Nothing
|
||||
|
||||
-- Does not fuse, has the same performance as the StreamK version.
|
||||
{-# INLINE_NORMAL tail #-}
|
||||
@ -351,7 +360,8 @@ tail (Stream step state) = go state
|
||||
r <- step defState st
|
||||
case r of
|
||||
Yield _ s -> return (Just $ Stream step s)
|
||||
Stop -> return Nothing
|
||||
Skip s -> go s
|
||||
Stop -> return Nothing
|
||||
|
||||
-- XXX will it fuse? need custom impl?
|
||||
{-# INLINE_NORMAL last #-}
|
||||
@ -365,24 +375,15 @@ elem e (Stream step state) = go state
|
||||
go st = do
|
||||
r <- step defState st
|
||||
case r of
|
||||
Yield x s ->
|
||||
if x == e
|
||||
then return True
|
||||
else go s
|
||||
Stop -> return False
|
||||
Yield x s
|
||||
| x == e -> return True
|
||||
| otherwise -> go s
|
||||
Skip s -> go s
|
||||
Stop -> return False
|
||||
|
||||
{-# INLINE_NORMAL notElem #-}
|
||||
notElem :: (Monad m, Eq a) => a -> Stream m a -> m Bool
|
||||
notElem e (Stream step state) = go state
|
||||
where
|
||||
go st = do
|
||||
r <- step defState st
|
||||
case r of
|
||||
Yield x s ->
|
||||
if x == e
|
||||
then return False
|
||||
else go s
|
||||
Stop -> return True
|
||||
notElem e s = fmap not (elem e s)
|
||||
|
||||
{-# INLINE_NORMAL all #-}
|
||||
all :: Monad m => (a -> Bool) -> Stream m a -> m Bool
|
||||
@ -391,11 +392,11 @@ all p (Stream step state) = go state
|
||||
go st = do
|
||||
r <- step defState st
|
||||
case r of
|
||||
Yield x s ->
|
||||
if p x
|
||||
then go s
|
||||
else return False
|
||||
Stop -> return True
|
||||
Yield x s
|
||||
| p x -> go s
|
||||
| otherwise -> return False
|
||||
Skip s -> go s
|
||||
Stop -> return True
|
||||
|
||||
{-# INLINE_NORMAL any #-}
|
||||
any :: Monad m => (a -> Bool) -> Stream m a -> m Bool
|
||||
@ -404,11 +405,11 @@ any p (Stream step state) = go state
|
||||
go st = do
|
||||
r <- step defState st
|
||||
case r of
|
||||
Yield x s ->
|
||||
if p x
|
||||
then return True
|
||||
else go s
|
||||
Stop -> return False
|
||||
Yield x s
|
||||
| p x -> return True
|
||||
| otherwise -> go s
|
||||
Skip s -> go s
|
||||
Stop -> return False
|
||||
|
||||
{-# INLINE_NORMAL maximum #-}
|
||||
maximum :: (Monad m, Ord a) => Stream m a -> m (Maybe a)
|
||||
@ -418,15 +419,16 @@ maximum (Stream step state) = go Nothing state
|
||||
r <- step defState st
|
||||
case r of
|
||||
Yield x s -> go (Just x) s
|
||||
Stop -> return Nothing
|
||||
Skip s -> go Nothing s
|
||||
Stop -> return Nothing
|
||||
go (Just acc) st = do
|
||||
r <- step defState st
|
||||
case r of
|
||||
Yield x s ->
|
||||
if acc <= x
|
||||
then go (Just x) s
|
||||
else go (Just acc) s
|
||||
Stop -> return (Just acc)
|
||||
Yield x s
|
||||
| acc <= x -> go (Just x) s
|
||||
| otherwise -> go (Just acc) s
|
||||
Skip s -> go (Just acc) s
|
||||
Stop -> return (Just acc)
|
||||
|
||||
{-# INLINE_NORMAL minimum #-}
|
||||
minimum :: (Monad m, Ord a) => Stream m a -> m (Maybe a)
|
||||
@ -436,15 +438,16 @@ minimum (Stream step state) = go Nothing state
|
||||
r <- step defState st
|
||||
case r of
|
||||
Yield x s -> go (Just x) s
|
||||
Stop -> return Nothing
|
||||
Skip s -> go Nothing s
|
||||
Stop -> return Nothing
|
||||
go (Just acc) st = do
|
||||
r <- step defState st
|
||||
case r of
|
||||
Yield x s ->
|
||||
if acc <= x
|
||||
then go (Just acc) s
|
||||
else go (Just x) s
|
||||
Stop -> return (Just acc)
|
||||
Yield x s
|
||||
| acc <= x -> go (Just acc) s
|
||||
| otherwise -> go (Just x) s
|
||||
Skip s -> go (Just acc) s
|
||||
Stop -> return (Just acc)
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
-- Map and Fold
|
||||
@ -468,10 +471,11 @@ toList = foldr (:) []
|
||||
toStreamK :: Monad m => Stream m a -> K.Stream m a
|
||||
toStreamK (Stream step state) = go state
|
||||
where
|
||||
go st = K.Stream $ \gst stp _ yld -> do
|
||||
go st = K.Stream $ \gst stp sng yld -> do
|
||||
r <- step gst st
|
||||
case r of
|
||||
Yield x s -> yld x (go s)
|
||||
Skip s -> K.unStream (go s) gst stp sng yld
|
||||
Stop -> stp
|
||||
|
||||
#ifndef DISABLE_FUSION
|
||||
@ -497,7 +501,8 @@ postscanlM' fstep begin (Stream step state) =
|
||||
Yield x s -> do
|
||||
y <- fstep acc x
|
||||
y `seq` return (Yield y (s, y))
|
||||
Stop -> return Stop
|
||||
Skip s -> return $ Skip (s, acc)
|
||||
Stop -> return Stop
|
||||
|
||||
{-# INLINE_LATE scanlM' #-}
|
||||
scanlM' :: Monad m => (b -> a -> m b) -> b -> Stream m a -> Stream m b
|
||||
@ -520,6 +525,7 @@ take n (Stream step state) = n `seq` Stream step' (state, 0)
|
||||
r <- step (rstState gst) st
|
||||
return $ case r of
|
||||
Yield x s -> Yield x (s, i + 1)
|
||||
Skip s -> Skip (s, i)
|
||||
Stop -> Stop
|
||||
step' _ (_, _) = return Stop
|
||||
|
||||
@ -534,7 +540,8 @@ takeWhileM f (Stream step state) = Stream step' state
|
||||
Yield x s -> do
|
||||
b <- f x
|
||||
return $ if b then Yield x s else Stop
|
||||
Stop -> return Stop
|
||||
Skip s -> return $ Skip s
|
||||
Stop -> return Stop
|
||||
|
||||
{-# INLINE takeWhile #-}
|
||||
takeWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
|
||||
@ -542,15 +549,26 @@ takeWhile f = takeWhileM (return . f)
|
||||
|
||||
{-# INLINE_NORMAL drop #-}
|
||||
drop :: Monad m => Int -> Stream m a -> Stream m a
|
||||
drop n (Stream step state) = Stream step' (state, n)
|
||||
drop n (Stream step state) = Stream step' (state, Just n)
|
||||
where
|
||||
{-# INLINE_LATE step' #-}
|
||||
step' gst (st, i) = do
|
||||
r <- step (rstState gst) st
|
||||
step' gst (st, Just i)
|
||||
| i > 0 = do
|
||||
r <- step (rstState gst) st
|
||||
return $
|
||||
case r of
|
||||
Yield _ s -> Skip (s, Just (i - 1))
|
||||
Skip s -> Skip (s, Just i)
|
||||
Stop -> Stop
|
||||
| otherwise = return $ Skip (st, Nothing)
|
||||
|
||||
step' gst (st, Nothing) = do
|
||||
r <- step (rstState gst) st
|
||||
return $
|
||||
case r of
|
||||
Yield _ s | i > 0 -> step' gst (s, i - 1)
|
||||
Yield x s -> return $ Yield x (s, 0)
|
||||
Stop -> return Stop
|
||||
Yield x s -> Yield x (s, Nothing)
|
||||
Skip s -> Skip (s, Nothing)
|
||||
Stop -> Stop
|
||||
|
||||
data DropWhileState s a
|
||||
= DropWhileDrop s
|
||||
@ -568,14 +586,16 @@ dropWhileM f (Stream step state) = Stream step' (DropWhileDrop state)
|
||||
Yield x s -> do
|
||||
b <- f x
|
||||
if b
|
||||
then step' gst (DropWhileDrop s)
|
||||
else step' gst (DropWhileYield x s)
|
||||
then return $ Skip (DropWhileDrop s)
|
||||
else return $ Skip (DropWhileYield x s)
|
||||
Skip s -> return $ Skip (DropWhileDrop s)
|
||||
Stop -> return Stop
|
||||
|
||||
step' gst (DropWhileNext st) = do
|
||||
r <- step (rstState gst) st
|
||||
case r of
|
||||
Yield x s -> step' gst (DropWhileYield x s)
|
||||
Yield x s -> return $ Skip (DropWhileYield x s)
|
||||
Skip s -> return $ Skip (DropWhileNext s)
|
||||
Stop -> return Stop
|
||||
|
||||
step' _ (DropWhileYield x st) = return $ Yield x (DropWhileNext st)
|
||||
@ -594,10 +614,11 @@ filterM f (Stream step state) = Stream step' state
|
||||
case r of
|
||||
Yield x s -> do
|
||||
b <- f x
|
||||
if b
|
||||
then return $ Yield x s
|
||||
else step' gst s
|
||||
Stop -> return Stop
|
||||
return $ if b
|
||||
then Yield x s
|
||||
else Skip s
|
||||
Skip s -> return $ Skip s
|
||||
Stop -> return Stop
|
||||
|
||||
{-# INLINE filter #-}
|
||||
filter :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
|
||||
@ -617,6 +638,7 @@ mapM f (Stream step state) = Stream step' state
|
||||
r <- step (rstState gst) st
|
||||
case r of
|
||||
Yield x s -> f x >>= \a -> return $ Yield a s
|
||||
Skip s -> return $ Skip s
|
||||
Stop -> return Stop
|
||||
|
||||
{-# INLINE map #-}
|
||||
@ -648,9 +670,11 @@ zipWithM f (Stream stepa ta) (Stream stepb tb) = Stream step (ta, tb, Nothing)
|
||||
{-# INLINE_LATE step #-}
|
||||
step gst (sa, sb, Nothing) = do
|
||||
r <- stepa (rstState gst) sa
|
||||
case r of
|
||||
Yield x sa' -> step gst (sa', sb, Just x)
|
||||
Stop -> return Stop
|
||||
return $
|
||||
case r of
|
||||
Yield x sa' -> Skip (sa', sb, Just x)
|
||||
Skip sa' -> Skip (sa', sb, Nothing)
|
||||
Stop -> Stop
|
||||
|
||||
step gst (sa, sb, Just x) = do
|
||||
r <- stepb (rstState gst) sb
|
||||
@ -658,7 +682,8 @@ zipWithM f (Stream stepa ta) (Stream stepb tb) = Stream step (ta, tb, Nothing)
|
||||
Yield y sb' -> do
|
||||
z <- f x y
|
||||
return $ Yield z (sa, sb', Nothing)
|
||||
Stop -> return Stop
|
||||
Skip sb' -> return $ Skip (sa, sb', Just x)
|
||||
Stop -> return Stop
|
||||
|
||||
{-# RULES "zipWithM xs xs"
|
||||
forall f xs. zipWithM f xs xs = mapM (\x -> f x x) xs #-}
|
||||
|
Loading…
Reference in New Issue
Block a user