Make the initial value of Fold a Step type

Review changes in this commit were done by Harendra Kumar (@harendra-kumar)
This commit is contained in:
adithyaov 2020-12-06 11:38:50 +05:30 committed by Adithya Kumar
parent 86745a756d
commit bdedb6949b
26 changed files with 1196 additions and 756 deletions

View File

@ -111,7 +111,7 @@ writeN limit = Fold step initial extract
where
initial = do
marr <- liftIO $ newArray limit bottomElement
return (Tuple' marr 0)
return $ FL.Partial (Tuple' marr 0)
step st@(Tuple' marr i) x
| i == limit = fmap FL.Done $ extract st
| otherwise = do
@ -125,7 +125,7 @@ write = Fold step initial extract
where
initial = do
marr <- liftIO $ newArray 0 bottomElement
return (Tuple3' marr 0 0)
return $ FL.Partial (Tuple3' marr 0 0)
step (Tuple3' marr i capacity) x
| i == capacity =
let newCapacity = max (capacity * 2) 1

View File

@ -6,6 +6,7 @@
import Control.Monad (when)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Bifunctor (first)
import Data.Primitive.Types (Prim(..), sizeOf)
import Streamly.Internal.Data.Fold.Types (Fold(..))
import Streamly.Internal.Data.SVar (adaptState)
@ -168,7 +169,7 @@ writeN limit = Fold step initial extract
initial = do
marr <- newArray limit
return $ Tuple' marr 0
return $ FL.Partial $ Tuple' marr 0
extract (Tuple' marr len) = shrinkArray marr len >> return marr
@ -367,8 +368,8 @@ lpackArraysChunksOf n (Fold step1 initial1 extract1) =
++ "packArraysChunksOf: the size of arrays ["
++ show n
++ "] must be a natural number"
r1 <- initial1
return (Tuple' Nothing r1)
res <- initial1
return $ first (Tuple' Nothing) res
extract (Tuple' Nothing r1) = extract1 r1
extract (Tuple' (Just buf) r1) = do
@ -386,8 +387,8 @@ lpackArraysChunksOf n (Fold step1 initial1 extract1) =
FL.Done () -> return $ FL.Done ()
FL.Partial s -> do
extract1 s
r1' <- initial1
return $ FL.Partial $ Tuple' Nothing r1'
res <- initial1
return $ first (Tuple' Nothing) res
else return $ FL.Partial $ Tuple' (Just arr) r1
step (Tuple' (Just buf) r1) arr = do
blen <- byteLength buf
@ -401,6 +402,6 @@ lpackArraysChunksOf n (Fold step1 initial1 extract1) =
FL.Done () -> return $ FL.Done ()
FL.Partial s -> do
extract1 s
r1' <- initial1
return $ FL.Partial $ Tuple' Nothing r1'
res <- initial1
return $ first (Tuple' Nothing) res
else return $ FL.Partial $ Tuple' (Just buf') r1

View File

@ -143,7 +143,7 @@ writeNAligned align limit = Fold step initial extract
initial = do
marr <- newAlignedArray limit align
return $ Tuple' marr 0
return $ FL.Partial $ Tuple' marr 0
extract (Tuple' marr len) = shrinkArray marr len >> return marr

View File

@ -8,6 +8,7 @@ import Control.DeepSeq (NFData(..))
import Control.Monad (when)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Primitive (PrimMonad(..), primitive_)
import Data.Bifunctor (first)
import Data.Primitive.Types (Prim(..), sizeOf)
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
@ -603,8 +604,8 @@ lpackArraysChunksOf n (Fold step1 initial1 extract1) =
-- XXX we can pass the module string from the higher level API
error $ "Streamly.Internal.Data.Array.Storable.Foreign.Types.packArraysChunksOf: the size of "
++ "arrays [" ++ show n ++ "] must be a natural number"
r1 <- initial1
return (Tuple3' Nothing' 0 r1)
res <- initial1
return $ first (Tuple3' Nothing' 0) res
extract (Tuple3' Nothing' _ r1) = extract1 r1
extract (Tuple3' (Just' buf) boff r1) = do
@ -622,8 +623,8 @@ lpackArraysChunksOf n (Fold step1 initial1 extract1) =
FL.Done _ -> return $ FL.Done ()
FL.Partial s -> do
extract1 s
r1' <- initial1
return $ FL.Partial $ Tuple3' Nothing' 0 r1'
res <- initial1
return $ first (Tuple3' Nothing' 0) res
else do
buf <- MA.newArray nElem
noff <- spliceInto buf 0 arr
@ -640,8 +641,8 @@ lpackArraysChunksOf n (Fold step1 initial1 extract1) =
FL.Done _ -> return $ FL.Done ()
FL.Partial s -> do
extract1 s
r1' <- initial1
return $ FL.Partial $ Tuple3' Nothing' 0 r1'
res <- initial1
return $ first (Tuple3' Nothing' 0) res
else return $ FL.Partial $ Tuple3' (Just' buf) noff r1
data SplitState s arr

View File

@ -319,7 +319,9 @@ lastN n
step (Tuple3' rb rh i) a = do
rh1 <- liftIO $ RB.unsafeInsert rb rh a
return $ FL.Partial $ Tuple3' rb rh1 (i + 1)
initial = fmap (\(a, b) -> Tuple3' a b (0 :: Int)) $ liftIO $ RB.new n
initial =
let f (a, b) = FL.Partial $ Tuple3' a b (0 :: Int)
in fmap f $ liftIO $ RB.new n
done (Tuple3' rb rh i) = do
arr <- liftIO $ MA.newArray n
foldFunc i rh snoc' arr rb

View File

@ -93,6 +93,7 @@ import Control.Exception (assert)
import Control.DeepSeq (NFData(..))
import Control.Monad (when, void)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Bifunctor (first)
import Data.Functor.Identity (runIdentity)
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
@ -674,8 +675,9 @@ lpackArraysChunksOf n (Fold step1 initial1 extract1) =
-- XXX we can pass the module string from the higher level API
error $ "Streamly.Internal.Data.Array.Storable.Foreign.Mut.Types.packArraysChunksOf: the size of "
++ "arrays [" ++ show n ++ "] must be a natural number"
r1 <- initial1
return (Tuple' Nothing r1)
r <- initial1
return $ first (Tuple' Nothing) r
extract (Tuple' Nothing r1) = extract1 r1
extract (Tuple' (Just buf) r1) = do
@ -693,8 +695,8 @@ lpackArraysChunksOf n (Fold step1 initial1 extract1) =
FL.Done _ -> return $ FL.Done ()
FL.Partial s -> do
extract1 s
r1' <- initial1
return $ FL.Partial $ Tuple' Nothing r1'
res <- initial1
return $ first (Tuple' Nothing) res
else return $ FL.Partial $ Tuple' (Just arr) r1
step (Tuple' (Just buf) r1) arr = do
@ -712,8 +714,8 @@ lpackArraysChunksOf n (Fold step1 initial1 extract1) =
FL.Done _ -> return $ FL.Done ()
FL.Partial s -> do
extract1 s
r1' <- initial1
return $ FL.Partial $ Tuple' Nothing r1'
res <- initial1
return $ first (Tuple' Nothing) res
else return $ FL.Partial $ Tuple' (Just buf'') r1
#if !defined(mingw32_HOST_OS)
@ -1090,7 +1092,7 @@ writeNAllocWith alloc n = Fold step initial extract
where
initial = liftIO $ alloc (max n 0)
initial = FL.Partial <$> liftIO (alloc (max n 0))
step arr@(Array _ end bound) _ | end == bound = return $ FL.Done arr
step (Array start end bound) x = do
liftIO $ poke end x
@ -1150,7 +1152,7 @@ writeNUnsafe n = Fold step initial extract
initial = do
(Array start end _) <- liftIO $ newArray (max n 0)
return $ ArrayUnsafe start end
return $ FL.Partial $ ArrayUnsafe start end
step (ArrayUnsafe start end) x = do
liftIO $ poke end x

View File

@ -19,8 +19,37 @@
--
module Streamly.Internal.Data.Either.Strict
( Either' (..)
, isLeft'
, isRight'
, fromLeft'
, fromRight'
)
where
-- | A strict 'Either'
data Either' a b = Left' !a | Right' !b deriving Show
-- | Return 'True' if the given value is a Left', 'False' otherwise.
{-# INLINABLE isLeft' #-}
isLeft' :: Either' a b -> Bool
isLeft' (Left' _) = True
isLeft' (Right' _) = False
-- | Return 'True' if the given value is a Right', 'False' otherwise.
{-# INLINABLE isRight' #-}
isRight' :: Either' a b -> Bool
isRight' (Left' _) = False
isRight' (Right' _) = True
-- XXX This is partial. We can use a default value instead.
-- | Return the contents of a Left'-value or errors out.
{-# INLINABLE fromLeft' #-}
fromLeft' :: Either' a b -> a
fromLeft' (Left' a) = a
fromLeft' _ = error "fromLeft' expecting a Left'-value"
-- | Return the contents of a Right'-value or errors out.
{-# INLINABLE fromRight' #-}
fromRight' :: Either' a b -> b
fromRight' (Right' b) = b
fromRight' _ = error "fromRight' expecting a Right'-value"

File diff suppressed because it is too large Load Diff

View File

@ -173,8 +173,6 @@ module Streamly.Internal.Data.Fold.Types
, Fold2 (..)
, simplify
, toListRevF -- experimental
-- $toListRevF
-- * Generators
, yield
@ -295,7 +293,7 @@ instance Functor (Step s) where
data Fold m a b =
-- | @Fold @ @ step @ @ initial @ @ extract@
forall s. Fold (s -> a -> m (Step s b)) (m s) (s -> m b)
forall s. Fold (s -> a -> m (Step s b)) (m (Step s b)) (s -> m b)
-- | Experimental type to provide a side input to the fold for generating the
-- initial state. For example, if we have to fold chunks of a stream and write
@ -309,15 +307,16 @@ data Fold2 m c a b =
-- | Convert more general type 'Fold2' into a simpler type 'Fold'
simplify :: Functor m => Fold2 m c a b -> c -> Fold m a b
simplify (Fold2 step inject extract) c =
Fold (\x a -> Partial <$> step x a) (inject c) extract
Fold (\x a -> Partial <$> step x a) (Partial <$> inject c) extract
-- | Maps a function on the output of the fold (the type @b@).
instance Functor m => Functor (Fold m a) where
{-# INLINE fmap #-}
fmap f (Fold step1 initial extract) = Fold step initial (fmap2 f extract)
fmap f (Fold step1 initial1 extract) = Fold step initial (fmap2 f extract)
where
initial = fmap2 f initial1
step s b = fmap2 f (step1 s b)
fmap2 g = fmap (fmap g)
@ -328,9 +327,8 @@ instance Functor m => Functor (Fold m a) where
-- /Unimplemented/
--
{-# INLINE yield #-}
yield :: -- Monad m =>
b -> Fold m a b
yield = undefined
yield :: Applicative m => b -> Fold m a b
yield b = Fold undefined (pure $ Done b) pure
-- This is the dual of stream "yieldM".
--
@ -340,12 +338,11 @@ yield = undefined
-- /Unimplemented/
--
{-# INLINE yieldM #-}
yieldM :: -- Monad m =>
m b -> Fold m a b
yieldM = undefined
yieldM :: Applicative m => m b -> Fold m a b
yieldM b = Fold undefined (Done <$> b) pure
{-# ANN type Step Fuse #-}
data SeqFoldState sl f sr = SeqFoldL sl | SeqFoldR f sr
data SeqFoldState sl f sr = SeqFoldL !sl | SeqFoldR !f !sr
-- | Sequential fold application. Apply two folds sequentially to an input
-- stream. The input is provided to the first fold, when it is done the
@ -366,13 +363,21 @@ splitWith func (Fold stepL initialL extractL) (Fold stepR initialR extractR) =
where
initial = SeqFoldL <$> initialL
initial = do
resL <- initialL
case resL of
Partial sl -> return $ Partial $ SeqFoldL sl
Done bl -> do
resR <- initialR
return $ bimap (SeqFoldR (func bl)) (func bl) resR
step (SeqFoldL st) a = do
r <- stepL st a
case r of
Partial s -> return $ Partial (SeqFoldL s)
Done b -> Partial <$> (SeqFoldR (func b) <$> initialR)
Done b -> do
res <- initialR
return $ bimap (SeqFoldR (func b)) (func b) res
step (SeqFoldR f st) a = do
r <- stepR st a
return
@ -383,9 +388,12 @@ splitWith func (Fold stepL initialL extractL) (Fold stepR initialR extractR) =
extract (SeqFoldR f sR) = fmap f (extractR sR)
extract (SeqFoldL sL) = do
rL <- extractL sL
sR <- initialR
rR <- extractR sR
return $ func rL rR
res <- initialR
case res of
Partial sR -> do
rR <- extractR sR
return $ func rL rR
Done rR -> return $ func rL rR
-- | Same as applicative '*>'. Run two folds serially one after the other
-- discarding the result of the first.
@ -407,7 +415,7 @@ data GenericRunner sL sR bL bR
-- folds and combines their output using the supplied function.
instance Monad m => Applicative (Fold m a) where
{-# INLINE pure #-}
pure b = Fold (\() _ -> pure $ Done b) (pure ()) (\() -> pure b)
pure = yield
{-# INLINE (<*>) #-}
(<*>) = teeWith ($)
@ -425,9 +433,16 @@ teeWith f (Fold stepL beginL doneL) (Fold stepR beginR doneR) =
where
begin = do
sL <- beginL
sR <- beginR
return $ RunBoth sL sR
resL <- beginL
resR <- beginR
return
$ case resL of
Partial sl ->
Partial
$ case resR of
Partial sr -> RunBoth sl sr
Done br -> RunLeft sl br
Done bl -> bimap (RunRight bl) (f bl) resR
step (RunBoth sL sR) a = do
resL <- stepL sL a
@ -629,22 +644,6 @@ instance (Monad m, Floating b) => Floating (Fold m a b) where
-- Internal APIs
------------------------------------------------------------------------------
-- $toListRevF
-- This is more efficient than 'Streamly.Internal.Data.Fold.toList'. toList is
-- exactly the same as reversing the list after 'toListRevF'.
-- | Buffers the input stream to a list in the reverse order of the input.
--
-- /Warning!/ working on large lists accumulated as buffers in memory could be
-- very inefficient, consider using "Streamly.Array" instead.
--
-- @since 0.7.0
-- xn : ... : x2 : x1 : []
{-# INLINABLE toListRevF #-}
toListRevF :: Monad m => Fold m a [a]
toListRevF = Fold (\xs x -> return $ Partial $ x:xs) (return []) return
-- | @(lmap f fold)@ maps the function @f@ on the input of the fold.
--
-- >>> S.fold (FL.lmap (\x -> x * x) FL.sum) (S.enumerateFromTo 1 100)
@ -720,30 +719,35 @@ ltake n (Fold fstep finitial fextract) = Fold step initial extract
where
initial = Tuple' 0 <$> finitial
initial = do
res <- finitial
case res of
Partial s ->
if n > 0
then return $ Partial $ Tuple' 0 s
else Done <$> fextract s
Done b -> return $ Done b
step (Tuple' i r) a
| i < n = do
res <- fstep r a
case res of
Partial sres -> do
let i1 = i + 1
s1 = Tuple' i1 sres
if i1 < n
then return $ Partial s1
else Done <$> fextract sres
Done bres -> return $ Done bres
-- XXX take 0 case is broken currently
| otherwise = Done <$> fextract r
step (Tuple' i r) a = do
res <- fstep r a
case res of
Partial sres -> do
let i1 = i + 1
s1 = Tuple' i1 sres
if i1 < n
then return $ Partial s1
else Done <$> fextract sres
Done bres -> return $ Done bres
extract (Tuple' _ r) = fextract r
------------------------------------------------------------------------------
-- Nesting
------------------------------------------------------------------------------
--
-- | Modify the fold such that when the fold is done, instead of returning the
-- accumulator, it returns a fold. The returned fold starts from where we left
-- | Modify the fold such that it returns a new 'Fold' instead of the output.
-- If the fold was already done the returned fold would always yield the
-- result. If the fold was partial, the returned fold starts from where we left
-- i.e. it uses the last accumulator value as the initial value of the
-- accumulator. Thus we can resume the fold later and feed it more input.
--
@ -755,11 +759,15 @@ ltake n (Fold fstep finitial fextract) = Fold step initial extract
--
-- @since 0.7.0
{-# INLINABLE duplicate #-}
duplicate ::
-- Monad m =>
Fold m a b -> Fold m a (Fold m a b)
-- XXX This should change once we have step type in the initial element
duplicate _ = undefined
duplicate :: Monad m => Fold m a b -> Fold m a (Fold m a b)
duplicate (Fold step1 initial1 extract1) =
Fold step initial (\s -> pure $ Fold step1 (pure $ Partial s) extract1)
where
initial = second yield <$> initial1
step s a = second yield <$> step1 s a
-- | Run the initialization effect of a fold. The returned fold would use the
-- value returned by this effect as its initial value.
@ -773,11 +781,16 @@ initialize (Fold step initial extract) = do
-- | Run one step of a fold and store the accumulator as an initial value in
-- the returned fold.
{-# INLINABLE runStep #-}
runStep ::
-- Monad m =>
Fold m a b -> a -> m (Fold m a b)
-- XXX This should change once we have step type in the initial element
runStep _ _ = undefined
runStep :: Monad m => Fold m a b -> a -> m (Fold m a b)
runStep (Fold step initial extract) a = return $ Fold step initial1 extract
where
initial1 = do
res <- initial
case res of
Partial fs -> step fs a
b@(Done _) -> return b
------------------------------------------------------------------------------
-- Parsing
@ -799,32 +812,51 @@ runStep _ _ = undefined
--
{-# INLINE many #-}
many :: Monad m => Fold m b c -> Fold m a b -> Fold m a c
many (Fold fstep finitial fextract) (Fold step1 initial1 extract1) =
many (Fold cstep cinitial cextract) (Fold sstep sinitial sextract) =
Fold step initial extract
where
initial = Tuple' <$> initial1 <*> finitial
-- cs = collect state
-- ss = split state
-- cres = collect state result
-- sres = split state result
-- cb = collect done
-- sb = split done
-- Caution! There is mutual recursion here, inlining the right functions is
-- important.
{-# INLINE handleSplitStep #-}
handleSplitStep cs sres =
case sres of
Partial ss1 -> return $ Partial $ Tuple' ss1 cs
Done sb -> runCollector cs sb
{-# INLINE handleCollectStep #-}
handleCollectStep cres =
case cres of
Partial cs -> do
sres <- sinitial
handleSplitStep cs sres
Done cb -> return $ Done cb
-- Do not inline this
runCollector cs sb = cstep cs sb >>= handleCollectStep
initial = cinitial >>= handleCollectStep
{-# INLINE step #-}
step (Tuple' st fs) a = do
r <- step1 st a
case r of
Partial s -> return $ Partial (Tuple' s fs)
Done b -> do
s <- initial1
fs1 <- fstep fs b
return
$ case fs1 of
Partial s1 -> Partial (Tuple' s s1)
Done b1 -> Done b1
step (Tuple' ss cs) a = do
sres <- sstep ss a
handleSplitStep cs sres
extract (Tuple' s fs) = do
b <- extract1 s
acc <- fstep fs b
case acc of
Partial s1 -> fextract s1
Done x -> return x
extract (Tuple' ss cs) = do
sb <- sextract ss
cres <- cstep cs sb
case cres of
Partial s -> cextract s
Done b -> return b
-- | @lchunksOf n split collect@ repeatedly applies the @split@ fold to chunks
-- of @n@ items in the input stream and supplies the result to the @collect@
@ -847,7 +879,13 @@ lchunksOf2 n (Fold step1 initial1 extract1) (Fold2 step2 inject2 extract2) =
where
inject' x = Tuple3' 0 <$> initial1 <*> inject2 x
loopUntilPartial s = do
res <- initial1
case res of
Partial fs -> return $ Tuple3' 0 fs s
Done _ -> loopUntilPartial s
inject' x = inject2 x >>= loopUntilPartial
step' (Tuple3' i r1 r2) a =
if i < n
@ -855,20 +893,10 @@ lchunksOf2 n (Fold step1 initial1 extract1) (Fold2 step2 inject2 extract2) =
res <- step1 r1 a
case res of
Partial s -> return $ Tuple3' (i + 1) s r2
Done b -> do
s <- initial1
r21 <- step2 r2 b
return $ Tuple3' 0 s r21
else do
res <- extract1 r1
acc2 <- step2 r2 res
i1 <- initial1
return $ Tuple3' 0 i1 acc2
Done b -> step2 r2 b >>= loopUntilPartial
else extract1 r1 >>= step2 r2 >>= loopUntilPartial
extract' (Tuple3' _ r1 r2) = do
res <- extract1 r1
acc2 <- step2 r2 res
extract2 acc2
extract' (Tuple3' _ r1 r2) = extract1 r1 >>= step2 r2 >>= extract2
-- | @takeByTime n fold@ uses @fold@ to fold the input items arriving within a
-- window of first @n@ seconds.
@ -884,18 +912,21 @@ takeByTime n (Fold step initial done) = Fold step' initial' done'
where
initial' = do
s <- initial
mv <- liftIO $ newMVar False
t <-
control $ \run ->
mask $ \restore -> do
tid <-
forkIO
$ catch
(restore $ void $ run (timerThread mv))
(handleChildException mv)
run (return tid)
return $ Tuple3' s mv t
res <- initial
case res of
Partial s -> do
mv <- liftIO $ newMVar False
t <-
control $ \run ->
mask $ \restore -> do
tid <-
forkIO
$ catch
(restore $ void $ run (timerThread mv))
(handleChildException mv)
run (return tid)
return $ Partial $ Tuple3' s mv t
Done b -> return $ Done b
step' (Tuple3' s mv t) a = do
val <- liftIO $ readMVar mv

View File

@ -20,6 +20,8 @@
module Streamly.Internal.Data.Maybe.Strict
( Maybe' (..)
, toMaybe
, isJust'
, fromJust'
)
where
@ -31,3 +33,16 @@ data Maybe' a = Just' !a | Nothing' deriving Show
toMaybe :: Maybe' a -> Maybe a
toMaybe Nothing' = Nothing
toMaybe (Just' a) = Just a
-- | Extract the element out of a Just' and throws an error if its argument is
-- Nothing'.
{-# INLINABLE fromJust' #-}
fromJust' :: Maybe' a -> a
fromJust' (Just' a) = a
fromJust' Nothing' = error "fromJust' cannot be run in Nothing'"
-- | Returns True iff its argument is of the form "Just' _".
{-# INLINABLE isJust' #-}
isJust' :: Maybe' a -> Bool
isJust' (Just' _) = True
isJust' Nothing' = False

View File

@ -175,16 +175,20 @@ import Streamly.Internal.Data.Parser.ParserD.Types
--
{-# INLINE fromFold #-}
fromFold :: Monad m => Fold m a b -> Parser m a b
fromFold (Fold fstep finitial fextract) = Parser step finitial fextract
fromFold (Fold fstep finitial fextract) = Parser step finitial extract
where
step s a = do
step (FL.Partial s) a = do
res <- fstep s a
return
$ case res of
FL.Partial s1 -> Partial 0 s1
FL.Partial s1 -> Partial 0 (FL.Partial s1)
FL.Done b -> Done 0 b
step (FL.Done b) _ = return $ Done 1 b
extract (FL.Partial s) = fextract s
extract (FL.Done b) = return b
-------------------------------------------------------------------------------
-- Failing Parsers
@ -293,7 +297,12 @@ takeBetween low high (Fold fstep finitial fextract) =
where
initial = Tuple' 0 <$> finitial
initial = do
res <- finitial
case res of
FL.Partial s -> return $ Tuple' 0 s
FL.Done _ ->
error "takeBetween: Done/Error in initial not implemented yet."
step (Tuple' i s) a
| low > high =
@ -341,25 +350,30 @@ takeBetween low high (Fold fstep finitial fextract) =
--
{-# INLINE takeEQ #-}
takeEQ :: MonadThrow m => Int -> Fold m a b -> Parser m a b
takeEQ cnt (Fold fstep finitial fextract) = Parser step initial extract
takeEQ n (Fold fstep finitial fextract) = Parser step initial extract
where
n = max cnt 0
cnt = max n 0
initial = Tuple' 0 <$> finitial
initial = do
res <- finitial
case res of
FL.Partial s -> return $ Tuple' 0 s
FL.Done _ ->
error "takeEQ: Done/Error in initial not implemented yet."
step (Tuple' i r) a
| i1 < n = do
| i1 < cnt = do
res <- fstep r a
return
$ case res of
FL.Partial s -> Continue 0 $ Tuple' i1 s
FL.Done _ ->
Error
$ "takeEQ: the collecting fold terminated after "
++ "consuming" ++ show i1 ++ " elements"
| i1 == n = do
$ "takeEQ: Expecting exactly " ++ show cnt
++ " elements, fold terminated on " ++ show i1
| i1 == cnt = do
res <- fstep r a
Done 0
<$> case res of
@ -374,12 +388,12 @@ takeEQ cnt (Fold fstep finitial fextract) = Parser step initial extract
i1 = i + 1
extract (Tuple' i r)
| i == 0 && n == 0 = fextract r
| i == 0 && cnt == 0 = fextract r
| otherwise =
throwM
$ ParseError
$ "takeEQ: Expecting exactly "
++ show n ++ " elements, got " ++ show i
$ "takeEQ: Expecting exactly " ++ show cnt
++ " elements, input terminated on " ++ show i
-- | See 'Streamly.Internal.Data.Parser.takeGE'.
--
@ -387,23 +401,28 @@ takeEQ cnt (Fold fstep finitial fextract) = Parser step initial extract
--
{-# INLINE takeGE #-}
takeGE :: MonadThrow m => Int -> Fold m a b -> Parser m a b
takeGE cnt (Fold fstep finitial fextract) = Parser step initial extract
takeGE n (Fold fstep finitial fextract) = Parser step initial extract
where
n = max cnt 0
initial = Tuple' 0 <$> finitial
cnt = max n 0
initial = do
res <- finitial
case res of
FL.Partial s -> return $ Tuple' 0 s
FL.Done _ ->
error "takeGE: Done/Error in initial not implemented yet."
step (Tuple' i r) a
| i1 < n = do
| i1 < cnt = do
res <- fstep r a
return
$ case res of
FL.Partial s -> Continue 0 $ Tuple' i1 s
FL.Done _ ->
Error
$ "takeGE: the collecting fold terminated after "
++ "consuming " ++ show i1 ++ " elements"
$ "takeGE: Expecting at least " ++ show cnt
++ " elements, fold terminated on " ++ show i1
| otherwise = do
res <- fstep r a
return
@ -416,12 +435,12 @@ takeGE cnt (Fold fstep finitial fextract) = Parser step initial extract
i1 = i + 1
extract (Tuple' i r)
| i >= n = fextract r
| i >= cnt = fextract r
| otherwise =
throwM
$ ParseError
$ "takeGE: Expecting at least "
++ show n ++ " elements, got " ++ show i
$ "takeGE: Expecting at least " ++ show cnt
++ " elements, input terminated on " ++ show i
-- | See 'Streamly.Internal.Data.Parser.takeWhile'.
--
@ -434,7 +453,12 @@ takeWhile predicate (Fold fstep finitial fextract) =
where
initial = finitial
initial = do
res <- finitial
case res of
FL.Partial s -> return s
FL.Done _ ->
error "takeWhile: Done/Error in initial not implemented yet."
step s a =
if predicate a
@ -460,15 +484,19 @@ takeWhile1 predicate (Fold fstep finitial fextract) =
initial = return Nothing
step Nothing a =
if predicate a
then do
s <- finitial
sr <- fstep s a
return
$ case sr of
FL.Partial r -> Partial 0 (Just r)
FL.Done b -> Done 0 b
else return $ Error "takeWhile1: predicate failed on first element"
let err = Error "takeWhile1: predicate failed on first element"
in if predicate a
then do
res <- finitial
case res of
FL.Partial s -> do
sr <- fstep s a
return
$ case sr of
FL.Partial s1 -> Partial 0 (Just s1)
FL.Done b -> Done 0 b
FL.Done _ -> return err
else return err
step (Just s) a =
if predicate a
then do
@ -533,7 +561,12 @@ wordBy predicate (Fold fstep finitial fextract) = Parser step initial extract
FL.Partial s1 -> Partial 0 $ WBWord s1
FL.Done b -> Done 0 b
initial = WBLeft <$> finitial
initial = do
res <- finitial
return
$ case res of
FL.Partial s -> WBLeft s
FL.Done _ -> error "wordBy: Done in initial not implemented"
step (WBLeft s) a =
if not (predicate a)
@ -575,7 +608,12 @@ groupBy cmp (Fold fstep finitial fextract) = Parser step initial extract
FL.Done b -> Done 0 b
FL.Partial s1 -> Partial 0 (GroupByGrouping a0 s1)
initial = GroupByInit <$> finitial
initial = do
res <- finitial
return
$ case res of
FL.Partial s -> GroupByInit s
FL.Done _ -> error "groupBy: Done in initial not implemented"
step (GroupByInit s) a = grouper s a a
step (GroupByGrouping a0 s) a =
@ -763,7 +801,11 @@ manyTill (Fold fstep finitial fextract)
where
initial = ManyTillR 0 <$> finitial <*> initialR
initial = do
res <- finitial
case res of
FL.Partial fs -> ManyTillR 0 fs <$> initialR
FL.Done _ -> error "manyTill: Done in initial not implemented"
step (ManyTillR cnt fs st) a = do
r <- stepR st a

View File

@ -443,8 +443,12 @@ splitMany (Fold fstep finitial fextract) (Parser step1 initial1 extract1) =
initial = do
ps <- initial1 -- parse state
fs <- finitial -- fold state
pure (Tuple3' ps (0 :: Int) fs)
res <- finitial -- fold state
return
$ case res of
FL.Partial fs -> Tuple3' ps (0 :: Int) fs
FL.Done _ ->
error "splitMany: Done in initial not implemented"
{-# INLINE step #-}
step (Tuple3' st cnt fs) a = do
@ -493,8 +497,12 @@ splitSome (Fold fstep finitial fextract) (Parser step1 initial1 extract1) =
initial = do
ps <- initial1 -- parse state
fs <- finitial -- fold state
pure (Tuple3' ps (0 :: Int) (Left fs))
res <- finitial -- fold state
return
$ case res of
FL.Partial fs -> Tuple3' ps (0 :: Int) (Left fs)
FL.Done _ ->
error "splitSome: Done/Error in initial not implemented"
{-# INLINE step #-}
step (Tuple3' st cnt (Left fs)) a = do

View File

@ -80,7 +80,7 @@ import qualified Data.Map.Strict as Map
toFold :: Monad m => Sink m a -> Fold m a ()
toFold (Sink f) = Fold step begin done
where
begin = return ()
begin = return $ Partial ()
step _ a = Partial <$> f a
done _ = return ()

View File

@ -112,7 +112,7 @@ writeN limit = Fold step initial extract
initial = do
marr <- liftIO $ newSmallArray limit bottomElement
return (Tuple' marr 0)
return $ FL.Partial (Tuple' marr 0)
step st@(Tuple' marr i) x
| i == limit = FL.Done <$> extract st

View File

@ -759,9 +759,7 @@ toHandle h = go
-- /Internal/
{-# INLINE toStream #-}
toStream :: Monad m => Fold m a (SerialT Identity a)
toStream = Fold (\f x -> return $ FL.Partial $ f . (x `K.cons`))
(return id)
(return . ($ K.nil))
toStream = FL.mkAccum (\f x -> f . (x `K.cons`)) id ($ K.nil)
-- This is more efficient than 'toStream'. toStream is exactly the same as
-- reversing the stream after toStreamRev.
@ -777,7 +775,7 @@ toStream = Fold (\f x -> return $ FL.Partial $ f . (x `K.cons`))
-- xn : ... : x2 : x1 : []
{-# INLINABLE toStreamRev #-}
toStreamRev :: Monad m => Fold m a (SerialT Identity a)
toStreamRev = Fold (\xs x -> return $ FL.Partial $ x `K.cons` xs) (return K.nil) return
toStreamRev = FL.mkAccum_ (flip K.cons) K.nil
-- | Convert a stream to a pure stream.
--

View File

@ -1802,14 +1802,7 @@ classifySessionsBy tick tmout reset ejectPred
--
let curTime = max sessionEventTime timestamp
mOld = Map.lookup key sessionKeyValueMap
fs <-
case mOld of
Nothing -> initial
Just (Tuple' _ acc) -> return acc
res <- step fs value
case res of
FL.Done fb -> do
let done fb = do
-- deleting a key from the heap is expensive, so we never
-- delete a key from heap, we just purge it from the Map and it
-- gets purged from the heap on timeout. We just need an extra
@ -1827,7 +1820,7 @@ classifySessionsBy tick tmout reset ejectPred
, sessionKeyValueMap = mp
, sessionOutputStream = yield (key, fb)
}
FL.Partial fs1 -> do
partial fs1 = do
let acc = Tuple' timestamp fs1
(hp1, mp1, out1, cnt1) <- do
let vars = (sessionTimerHeap, sessionKeyValueMap,
@ -1858,6 +1851,17 @@ classifySessionsBy tick tmout reset ejectPred
, sessionKeyValueMap = mp2
, sessionOutputStream = out1
}
res0 <- do
case mOld of
Nothing -> initial
Just (Tuple' _ acc) -> return $ FL.Partial acc
case res0 of
FL.Done fb -> done fb
FL.Partial fs -> do
res <- step fs value
case res of
FL.Done fb -> done fb
FL.Partial fs1 -> partial fs1
-- Got a timer tick event
sstep sessionState@SessionState{..} Nothing =

View File

@ -1218,27 +1218,43 @@ parseIterate func seed (Stream step state) =
-- Grouping
------------------------------------------------------------------------------
data GroupByState st fs a b
= GroupingInit st
| GroupingDo st !fs
| GroupingInitWith st !a
| GroupingDoWith st !fs !a
| GroupingYield !b (GroupByState st fs a b)
| GroupingDone
{-# INLINE_NORMAL groupsBy #-}
groupsBy :: Monad m
=> (a -> a -> Bool)
-> Fold m a b
-> Stream m a
-> Stream m b
groupsBy cmp f (Stream step state) = Stream (stepOuter f) (Just state, Nothing)
groupsBy cmp (Fold fstep initial done) (Stream step state) =
Stream stepOuter (GroupingInit state)
where
{-# INLINE_LATE stepOuter #-}
stepOuter (Fold fstep initial done) gst (Just st, Nothing) = do
stepOuter _ (GroupingInit st) = do
-- XXX Note that if the stream stops without yielding a single element
-- in the group we discard the "initial" effect.
res <- initial
return
$ case res of
FL.Partial s -> Skip $ GroupingDo st s
FL.Done b -> Yield b $ GroupingInit st
stepOuter gst (GroupingDo st fs) = do
res <- step (adaptState gst) st
case res of
Yield x s -> do
fs <- initial
r <- fstep fs x
case r of
FL.Partial fs1 -> go SPEC x s fs1
FL.Done b -> return $ Yield b (Just s, Just x)
Skip s -> return $ Skip (Just s, Nothing)
FL.Done b -> return $ Yield b (GroupingInit s)
Skip s -> return $ Skip $ GroupingDo s fs
Stop -> return Stop
where
@ -1252,16 +1268,23 @@ groupsBy cmp f (Stream step state) = Stream (stepOuter f) (Just state, Nothing)
r <- fstep acc x
case r of
FL.Partial fs1 -> go SPEC prev s fs1
FL.Done b -> return $ Yield b (Just s, Just x)
else done acc >>= \r -> return $ Yield r (Just s, Just x)
FL.Done b -> return $ Yield b (GroupingInit s)
else do
r <- done acc
return $ Yield r (GroupingInitWith s x)
Skip s -> go SPEC prev s acc
Stop -> done acc >>= \r -> return $ Yield r (Nothing, Nothing)
stepOuter (Fold fstep initial done) gst (Just st, Just prev) = do
fs <- initial
r <- fstep fs prev
case r of
Stop -> done acc >>= \r -> return $ Yield r GroupingDone
stepOuter _ (GroupingInitWith st x) = do
res <- initial
return
$ case res of
FL.Partial s -> Skip $ GroupingDoWith st s x
FL.Done b -> Yield b $ GroupingInitWith st x
stepOuter gst (GroupingDoWith st fs prev) = do
res <- fstep fs prev
case res of
FL.Partial fs1 -> go SPEC st fs1
FL.Done b -> return $ Yield b (Just st, Nothing)
FL.Done b -> return $ Yield b (GroupingInit st)
where
@ -1275,11 +1298,14 @@ groupsBy cmp f (Stream step state) = Stream (stepOuter f) (Just state, Nothing)
r <- fstep acc x
case r of
FL.Partial fs1 -> go SPEC s fs1
FL.Done b -> return $ Yield b (Just s, Just x)
else done acc >>= \r -> return $ Yield r (Just s, Just x)
FL.Done b -> return $ Yield b (GroupingInit s)
else do
r <- done acc
return $ Yield r (GroupingInitWith s x)
Skip s -> go SPEC s acc
Stop -> done acc >>= \r -> return $ Yield r (Nothing, Nothing)
stepOuter _ _ (Nothing, _) = return Stop
Stop -> done acc >>= \r -> return $ Yield r GroupingDone
stepOuter _ (GroupingYield _ _) = error "groupsBy: Unreachable"
stepOuter _ GroupingDone = return Stop
{-# INLINE_NORMAL groupsRollingBy #-}
groupsRollingBy :: Monad m
@ -1287,22 +1313,29 @@ groupsRollingBy :: Monad m
-> Fold m a b
-> Stream m a
-> Stream m b
groupsRollingBy cmp f (Stream step state) =
Stream (stepOuter f) (Just state, Nothing)
groupsRollingBy cmp (Fold fstep initial done) (Stream step state) =
Stream stepOuter (GroupingInit state)
where
{-# INLINE_LATE stepOuter #-}
stepOuter (Fold fstep initial done) gst (Just st, Nothing) = do
stepOuter _ (GroupingInit st) = do
-- XXX Note that if the stream stops without yielding a single element
-- in the group we discard the "initial" effect.
res <- initial
return
$ case res of
FL.Partial fs -> Skip $ GroupingDo st fs
FL.Done fb -> Yield fb $ GroupingInit st
stepOuter gst (GroupingDo st fs) = do
res <- step (adaptState gst) st
case res of
Yield x s -> do
fs <- initial
r <- fstep fs x
case r of
FL.Partial fs1 -> go SPEC x s fs1
FL.Done b -> return $ Yield b (Just s, Just x)
Skip s -> return $ Skip $ (Just s, Nothing)
FL.Done fb -> return $ Yield fb (GroupingInit s)
Skip s -> return $ Skip $ GroupingDo s fs
Stop -> return Stop
where
@ -1316,62 +1349,104 @@ groupsRollingBy cmp f (Stream step state) =
r <- fstep acc x
case r of
FL.Partial fs1 -> go SPEC x s fs1
FL.Done b -> return $ Yield b (Just s, Just x)
else done acc >>= \r -> return $ Yield r (Just s, Just x)
FL.Done b -> return $ Yield b (GroupingInit s)
else do
r <- done acc
return $ Yield r (GroupingInitWith s x)
Skip s -> go SPEC prev s acc
Stop -> done acc >>= \r -> return $ Yield r (Nothing, Nothing)
stepOuter (Fold fstep initial done) gst (Just st, Just prev') = do
fs <- initial
r <- fstep fs prev'
case r of
FL.Partial fs1 -> go SPEC prev' st fs1
FL.Done b -> return $ Yield b (Just st, Nothing)
Stop -> done acc >>= \r -> return $ Yield r GroupingDone
stepOuter _ (GroupingInitWith st x) = do
res <- initial
return
$ case res of
FL.Partial s -> Skip $ GroupingDoWith st s x
FL.Done b -> Yield b $ GroupingInitWith st x
stepOuter gst (GroupingDoWith st fs previous) = do
res <- fstep fs previous
case res of
FL.Partial s -> go SPEC previous st s
FL.Done b -> return $ Yield b (GroupingInit st)
where
go !_ prevv stt !acc = do
-- XXX GHC: groupsBy has one less parameter in this go loop and it
-- fuses. However, groupsRollingBy does not fuse, removing the prev
-- parameter makes it fuse. Something needs to be fixed in GHC. The
-- workaround for this is noted in the comments below.
go !_ prev !stt !acc = do
res <- step (adaptState gst) stt
case res of
Yield x s -> do
if cmp prevv x
if cmp prev x
then do
r <- fstep acc x
case r of
FL.Partial fs1 -> go SPEC x s fs1
FL.Done b -> return $ Yield b (Just s, Just x)
else done acc >>= \r -> return $ Yield r (Just s, Just x)
Skip s -> go SPEC prevv s acc
Stop -> done acc >>= \r -> return $ Yield r (Nothing, Nothing)
stepOuter _ _ (Nothing, _) = return Stop
FL.Done b -> return $ Yield b (GroupingInit st)
else do
{-
r <- done acc
return $ Yield r (GroupingInitWith s x)
-}
-- The code above does not let groupBy fuse. We use the
-- alternative code below instead. Instead of jumping
-- to GroupingInitWith state, we unroll the code of
-- GroupingInitWith state here to help GHC with stream
-- fusion.
result <- initial
r <- done acc
return
$ Yield r
$ case result of
FL.Partial fsi -> GroupingDoWith s fsi x
FL.Done b -> GroupingYield b (GroupingInit s)
Skip s -> go SPEC prev s acc
Stop -> done acc >>= \r -> return $ Yield r GroupingDone
stepOuter _ (GroupingYield r next) = return $ Yield r next
stepOuter _ GroupingDone = return Stop
------------------------------------------------------------------------------
-- Splitting - by a predicate
------------------------------------------------------------------------------
data WordsByState s = WordsByJust s | WordsByNothing
data WordsByState st fs b
= WordsByInit st
| WordsByDo st !fs
| WordsByDone
| WordsByYield !b (WordsByState st fs b)
{-# INLINE_NORMAL wordsBy #-}
wordsBy :: Monad m => (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
wordsBy predicate f (Stream step state) =
Stream (stepOuter f) (WordsByJust state)
wordsBy predicate (Fold fstep initial done) (Stream step state) =
Stream stepOuter (WordsByInit state)
where
{-# INLINE_LATE stepOuter #-}
stepOuter (Fold fstep initial done) gst (WordsByJust st) = do
stepOuter _ (WordsByInit st) = do
res <- initial
return
$ case res of
FL.Partial s -> Skip $ WordsByDo st s
FL.Done b -> Yield b (WordsByInit st)
stepOuter gst (WordsByDo st fs) = do
res <- step (adaptState gst) st
case res of
Yield x s -> do
if predicate x
then return $ Skip (WordsByJust s)
then do
resi <- initial
return
$ case resi of
FL.Partial fs1 -> Skip $ WordsByDo s fs1
FL.Done b -> Yield b (WordsByInit s)
else do
fs <- initial
r <- fstep fs x
case r of
FL.Partial fs1 -> go SPEC s fs1
FL.Done b -> return $ Yield b (WordsByJust s)
Skip s -> return $ Skip $ WordsByJust s
FL.Done b -> return $ Yield b (WordsByInit s)
Skip s -> return $ Skip $ WordsByDo s fs
Stop -> return Stop
where
@ -1381,16 +1456,34 @@ wordsBy predicate f (Stream step state) =
case res of
Yield x s -> do
if predicate x
then done acc >>= \r -> return $ Yield r (WordsByJust s)
then do
{-
r <- done acc
return $ Yield r (WordsByInit s)
-}
-- The above code does not fuse well. Need to check why
-- GHC is not able to simplify it well. Using the code
-- below, instead of jumping through the WordsByInit
-- state always, we directly go to WordsByDo state in
-- the common case of Partial.
resi <- initial
r <- done acc
return
$ Yield r
$ case resi of
FL.Partial fs1 -> WordsByDo s fs1
FL.Done b -> WordsByYield b (WordsByInit s)
else do
r <- fstep acc x
case r of
FL.Partial fs1 -> go SPEC s fs1
FL.Done b -> return $ Yield b (WordsByJust s)
FL.Done b -> return $ Yield b (WordsByInit s)
Skip s -> go SPEC s acc
Stop -> done acc >>= \r -> return $ Yield r WordsByNothing
Stop -> done acc >>= \r -> return $ Yield r WordsByDone
stepOuter _ _ WordsByNothing = return Stop
stepOuter _ WordsByDone = return Stop
stepOuter _ (WordsByYield b next) = return $ Yield b next
------------------------------------------------------------------------------
-- Splitting on a sequence
@ -1413,25 +1506,30 @@ data SplitOptions = SplitOptions
}
-}
-- XXX using "fs" as the last arg in Constructors may simplify the code a bit,
-- because we can use the constructor directly without having to create "jump"
-- functions.
{-# ANN type SplitOnSeqState Fuse #-}
data SplitOnSeqState rb rh ck w fs s b x =
SplitOnSeqInit
| SplitOnSeqYield b (SplitOnSeqState rb rh ck w fs s b x)
| SplitOnSeqDone
| SplitOnSeqEmpty s
| SplitOnSeqEmpty !fs s
| SplitOnSeqSingle !fs s x
| SplitOnSeqWordInit s
| SplitOnSeqWordInit !fs s
| SplitOnSeqWordLoop !w s !fs
| SplitOnSeqWordDone Int !fs !w
| SplitOnSeqKRInit Int s rb !rh
| SplitOnSeqKRInit Int !fs s rb !rh
| SplitOnSeqKRLoop fs s rb !rh !ck
| SplitOnSeqKRCheck fs s rb !rh
| SplitOnSeqKRDone Int !fs rb !rh
| SplitOnSeqReinit (fs -> SplitOnSeqState rb rh ck w fs s b x)
{-# INLINE_NORMAL splitOnSeq #-}
splitOnSeq
:: forall m a b. (MonadIO m, Storable a, Enum a, Eq a)
@ -1474,41 +1572,59 @@ splitOnSeq patArr (Fold fstep initial done) (Stream step state) =
skip = return . Skip
nextAfterInit nextGen stepRes =
case stepRes of
FL.Partial s -> nextGen s
FL.Done b -> SplitOnSeqYield b (SplitOnSeqReinit nextGen)
{-# INLINE yieldProceed #-}
yieldProceed nextGen fs =
initial >>= skip . SplitOnSeqYield fs . nextAfterInit nextGen
{-# INLINE_LATE stepOuter #-}
stepOuter _ SplitOnSeqInit =
if patLen == 0
then return $ Skip $ SplitOnSeqEmpty state
else if patLen == 1
then do
acc <- initial
pat <- liftIO $ A.unsafeIndexIO patArr 0
return $ Skip $ SplitOnSeqSingle acc state pat
else if sizeOf (undefined :: a) * patLen
<= sizeOf (undefined :: Word)
then return $ Skip $ SplitOnSeqWordInit state
else do
(rb, rhead) <- liftIO $ RB.new patLen
skip $ SplitOnSeqKRInit 0 state rb rhead
stepOuter _ SplitOnSeqInit = do
res <- initial
case res of
FL.Partial acc ->
if patLen == 0
then return $ Skip $ SplitOnSeqEmpty acc state
else if patLen == 1
then do
pat <- liftIO $ A.unsafeIndexIO patArr 0
return $ Skip $ SplitOnSeqSingle acc state pat
else if sizeOf (undefined :: a) * patLen
<= sizeOf (undefined :: Word)
then return $ Skip $ SplitOnSeqWordInit acc state
else do
(rb, rhead) <- liftIO $ RB.new patLen
skip $ SplitOnSeqKRInit 0 acc state rb rhead
FL.Done b -> skip $ SplitOnSeqYield b SplitOnSeqInit
stepOuter _ (SplitOnSeqYield x next) = return $ Yield x next
---------------------------
-- Checkpoint
---------------------------
stepOuter _ (SplitOnSeqReinit nextGen) =
initial >>= skip . nextAfterInit nextGen
---------------------------
-- Empty pattern
---------------------------
stepOuter gst (SplitOnSeqEmpty st) = do
stepOuter gst (SplitOnSeqEmpty acc st) = do
res <- step (adaptState gst) st
case res of
Yield x s -> do
fs <- initial
r <- fstep fs x
case r of
FL.Partial fs1 -> do
b <- done fs1
skip $ SplitOnSeqYield b (SplitOnSeqEmpty s)
FL.Done b -> skip $ SplitOnSeqYield b (SplitOnSeqEmpty s)
Skip s -> return $ Skip (SplitOnSeqEmpty s)
r <- fstep acc x
b1 <-
case r of
FL.Partial acc1 -> done acc1
FL.Done b -> return b
let jump c = SplitOnSeqEmpty c s
in yieldProceed jump b1
Skip s -> skip (SplitOnSeqEmpty acc s)
Stop -> return Stop
-----------------
@ -1525,18 +1641,14 @@ splitOnSeq patArr (Fold fstep initial done) (Stream step state) =
res <- step (adaptState gst) st
case res of
Yield x s -> do
let jump c = SplitOnSeqSingle c s pat
if pat == x
then do
r <- done fs
fs1 <- initial
return $ Skip $ SplitOnSeqYield r (SplitOnSeqSingle fs1 s pat)
then done fs >>= yieldProceed jump
else do
r <- fstep fs x
case r of
FL.Partial fs1 -> skip $ SplitOnSeqSingle fs1 s pat
FL.Done b -> do
fs1 <- initial
skip $ SplitOnSeqYield b (SplitOnSeqSingle fs1 s pat)
FL.Partial fs1 -> skip $ jump fs1
FL.Done b -> yieldProceed jump b
Skip s -> return $ Skip $ SplitOnSeqSingle fs s pat
Stop -> do
r <- done fs
@ -1555,11 +1667,10 @@ splitOnSeq patArr (Fold fstep initial done) (Stream step state) =
case r of
FL.Partial fs1 -> skip $ SplitOnSeqWordDone (n - 1) fs1 wrd
FL.Done b -> do
fs1 <- initial
let next = SplitOnSeqWordDone (n - 1) fs1 wrd
skip $ SplitOnSeqYield b next
let jump c = SplitOnSeqWordDone (n - 1) c wrd
yieldProceed jump b
stepOuter gst (SplitOnSeqWordInit st0) =
stepOuter gst (SplitOnSeqWordInit fs st0) =
go SPEC 0 0 st0
where
@ -1572,17 +1683,14 @@ splitOnSeq patArr (Fold fstep initial done) (Stream step state) =
let wrd1 = addToWord wrd x
if idx == maxIndex
then do
fs <- initial
if wrd1 .&. wordMask == wordPat
then do
r <- done fs
let next = SplitOnSeqWordInit s
skip $ SplitOnSeqYield r next
let jump c = SplitOnSeqWordInit c s
done fs >>= yieldProceed jump
else skip $ SplitOnSeqWordLoop wrd1 s fs
else go SPEC (idx + 1) wrd1 s
Skip s -> go SPEC idx wrd s
Stop -> do
fs <- initial
if idx /= 0
then skip $ SplitOnSeqWordDone idx fs wrd
else do
@ -1599,21 +1707,17 @@ splitOnSeq patArr (Fold fstep initial done) (Stream step state) =
res <- step (adaptState gst) st
case res of
Yield x s -> do
let wrd1 = addToWord wrd x
let jump c = SplitOnSeqWordInit c s
wrd1 = addToWord wrd x
old = (wordMask .&. wrd)
`shiftR` (elemBits * (patLen - 1))
r <- fstep fs (toEnum $ fromIntegral old)
case r of
FL.Partial fs1 -> do
if wrd1 .&. wordMask == wordPat
then do
b <- done fs1
let next = SplitOnSeqWordInit s
skip $ SplitOnSeqYield b next
then done fs1 >>= yieldProceed jump
else go SPEC wrd1 s fs1
FL.Done b ->
let next = SplitOnSeqWordInit s
in skip $ SplitOnSeqYield b next
FL.Done b -> yieldProceed jump b
Skip s -> go SPEC wrd s fs
Stop -> skip $ SplitOnSeqWordDone patLen fs wrd
@ -1621,7 +1725,7 @@ splitOnSeq patArr (Fold fstep initial done) (Stream step state) =
-- General Pattern - Karp Rabin
-------------------------------
stepOuter gst (SplitOnSeqKRInit idx st rb rh) = do
stepOuter gst (SplitOnSeqKRInit idx fs st rb rh) = do
res <- step (adaptState gst) st
case res of
Yield x s -> do
@ -1630,14 +1734,12 @@ splitOnSeq patArr (Fold fstep initial done) (Stream step state) =
then do
let fold = RB.unsafeFoldRing (RB.ringBound rb)
let !ringHash = fold addCksum 0 rb
fs <- initial
if ringHash == patHash
then skip $ SplitOnSeqKRCheck fs s rb rh1
else skip $ SplitOnSeqKRLoop fs s rb rh1 ringHash
else skip $ SplitOnSeqKRInit (idx + 1) s rb rh1
Skip s -> skip $ SplitOnSeqKRInit idx s rb rh
else skip $ SplitOnSeqKRInit (idx + 1) fs s rb rh1
Skip s -> skip $ SplitOnSeqKRInit idx fs s rb rh
Stop -> do
fs <- initial
skip $ SplitOnSeqKRDone idx fs rb (RB.startOf rb)
-- XXX The recursive "go" is more efficient than the state based recursion
@ -1662,9 +1764,10 @@ splitOnSeq patArr (Fold fstep initial done) (Stream step state) =
if cksum1 == patHash
then skip $ SplitOnSeqKRCheck fs1 s rb rh1
else go SPEC fs1 s rh1 cksum1
FL.Done b ->
let next = SplitOnSeqKRInit 0 s rb (RB.startOf rb)
in skip $ SplitOnSeqYield b next
FL.Done b -> do
let rst = RB.startOf rb
jump c = SplitOnSeqKRInit 0 c s rb rst
yieldProceed jump b
Skip s -> go SPEC fs s rh cksum
Stop -> skip $ SplitOnSeqKRDone patLen fs rb rh
@ -1698,9 +1801,11 @@ splitOnSeq patArr (Fold fstep initial done) (Stream step state) =
if RB.unsafeEqArray rb rh patArr
then do
r <- done fs
let next = SplitOnSeqKRInit 0 st rb (RB.startOf rb)
skip $ SplitOnSeqYield r next
let rst = RB.startOf rb
jump c = SplitOnSeqKRInit 0 c st rb rst
yieldProceed jump r
else skip $ SplitOnSeqKRLoop fs st rb rh patHash
stepOuter _ (SplitOnSeqKRDone 0 fs _ _) = do
r <- done fs
skip $ SplitOnSeqYield r SplitOnSeqDone
@ -1711,9 +1816,8 @@ splitOnSeq patArr (Fold fstep initial done) (Stream step state) =
case r of
FL.Partial fs1 -> skip $ SplitOnSeqKRDone (n - 1) fs1 rb rh1
FL.Done b -> do
fs1 <- initial
let next = SplitOnSeqKRDone (n - 1) fs1 rb rh1
skip $ SplitOnSeqYield b next
let jump c = SplitOnSeqKRDone (n - 1) c rb rh1
yieldProceed jump b
{-# ANN type SplitOnSuffixSeqState Fuse #-}
data SplitOnSuffixSeqState rb rh ck w fs s b x =
@ -1721,21 +1825,24 @@ data SplitOnSuffixSeqState rb rh ck w fs s b x =
| SplitOnSuffixSeqYield b (SplitOnSuffixSeqState rb rh ck w fs s b x)
| SplitOnSuffixSeqDone
| SplitOnSuffixSeqEmpty s
| SplitOnSuffixSeqEmpty !fs s
| SplitOnSuffixSeqSingleInit s x
| SplitOnSuffixSeqSingleInit !fs s x
| SplitOnSuffixSeqSingle !fs s x
| SplitOnSuffixSeqWordInit s
| SplitOnSuffixSeqWordInit !fs s
| SplitOnSuffixSeqWordLoop !w s !fs
| SplitOnSuffixSeqWordDone Int !fs !w
| SplitOnSuffixSeqKRInit Int s rb !rh
| SplitOnSuffixSeqKRInit Int !fs s rb !rh
| SplitOnSuffixSeqKRInit1 !fs s rb !rh
| SplitOnSuffixSeqKRLoop fs s rb !rh !ck
| SplitOnSuffixSeqKRCheck fs s rb !rh
| SplitOnSuffixSeqKRDone Int !fs rb !rh
| SplitOnSuffixSeqReinit
(fs -> SplitOnSuffixSeqState rb rh ck w fs s b x)
{-# INLINE_NORMAL splitOnSuffixSeq #-}
splitOnSuffixSeq
:: forall m a b. (MonadIO m, Storable a, Enum a, Eq a)
@ -1765,27 +1872,33 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) =
addToWord wd a = (wd `shiftL` elemBits) .|. fromIntegral (fromEnum a)
nextAfterInit nextGen stepRes =
case stepRes of
FL.Partial s -> nextGen s
FL.Done b ->
SplitOnSuffixSeqYield b (SplitOnSuffixSeqReinit nextGen)
{-# INLINE yieldProceed #-}
yieldProceed nextGen fs =
initial >>= skip . SplitOnSuffixSeqYield fs . nextAfterInit nextGen
-- For single element pattern case
{-# INLINE processYieldSingle #-}
processYieldSingle pat x s fs =
processYieldSingle pat x s fs = do
let jump c = SplitOnSuffixSeqSingleInit c s pat
if pat == x
then do
r <- if withSep then fstep fs x else return $ FL.Partial fs
case r of
FL.Partial fs1 -> do
b <- done fs1
let next = SplitOnSuffixSeqSingleInit s pat
skip $ SplitOnSuffixSeqYield b next
FL.Done b ->
let next = SplitOnSuffixSeqSingleInit s pat
in skip $ SplitOnSuffixSeqYield b next
b1 <-
case r of
FL.Partial fs1 -> done fs1
FL.Done b -> return b
yieldProceed jump b1
else do
r <- fstep fs x
case r of
FL.Partial fs1 -> skip $ SplitOnSuffixSeqSingle fs1 s pat
FL.Done b ->
let next = SplitOnSuffixSeqSingleInit s pat
in skip $ SplitOnSuffixSeqYield b next
FL.Done b -> yieldProceed jump b
-- For Rabin-Karp search
k = 2891336453 :: Word32
@ -1802,39 +1915,49 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) =
skip = return . Skip
{-# INLINE_LATE stepOuter #-}
stepOuter _ SplitOnSuffixSeqInit =
if patLen == 0
then skip $ SplitOnSuffixSeqEmpty state
else if patLen == 1
then do
pat <- liftIO $ A.unsafeIndexIO patArr 0
skip $ SplitOnSuffixSeqSingleInit state pat
else if sizeOf (undefined :: a) * patLen
<= sizeOf (undefined :: Word)
then skip $ SplitOnSuffixSeqWordInit state
else do
(rb, rhead) <- liftIO $ RB.new patLen
skip $ SplitOnSuffixSeqKRInit 0 state rb rhead
stepOuter _ SplitOnSuffixSeqInit = do
res <- initial
case res of
FL.Partial fs ->
if patLen == 0
then skip $ SplitOnSuffixSeqEmpty fs state
else if patLen == 1
then do
pat <- liftIO $ A.unsafeIndexIO patArr 0
skip $ SplitOnSuffixSeqSingleInit fs state pat
else if sizeOf (undefined :: a) * patLen
<= sizeOf (undefined :: Word)
then skip $ SplitOnSuffixSeqWordInit fs state
else do
(rb, rhead) <- liftIO $ RB.new patLen
skip $ SplitOnSuffixSeqKRInit 0 fs state rb rhead
FL.Done fb -> skip $ SplitOnSuffixSeqYield fb SplitOnSuffixSeqInit
stepOuter _ (SplitOnSuffixSeqYield x next) = return $ Yield x next
---------------------------
-- Reinit
---------------------------
stepOuter _ (SplitOnSuffixSeqReinit nextGen) =
initial >>= skip . nextAfterInit nextGen
---------------------------
-- Empty pattern
---------------------------
stepOuter gst (SplitOnSuffixSeqEmpty st) = do
stepOuter gst (SplitOnSuffixSeqEmpty acc st) = do
res <- step (adaptState gst) st
case res of
Yield x s -> do
acc <- initial
let jump c = SplitOnSuffixSeqEmpty c s
r <- fstep acc x
case r of
FL.Partial fs -> do
b <- done fs
skip $ SplitOnSuffixSeqYield b (SplitOnSuffixSeqEmpty s)
FL.Done b ->
skip $ SplitOnSuffixSeqYield b (SplitOnSuffixSeqEmpty s)
Skip s -> skip (SplitOnSuffixSeqEmpty s)
b1 <-
case r of
FL.Partial fs -> done fs
FL.Done b -> return b
yieldProceed jump b1
Skip s -> skip (SplitOnSuffixSeqEmpty acc s)
Stop -> return Stop
-----------------
@ -1847,11 +1970,11 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) =
-- Single Pattern
-----------------
stepOuter gst (SplitOnSuffixSeqSingleInit st pat) = do
stepOuter gst (SplitOnSuffixSeqSingleInit fs st pat) = do
res <- step (adaptState gst) st
case res of
Yield x s -> initial >>= processYieldSingle pat x s
Skip s -> skip $ SplitOnSuffixSeqSingleInit s pat
Yield x s -> processYieldSingle pat x s fs
Skip s -> skip $ SplitOnSuffixSeqSingleInit fs s pat
Stop -> return Stop
stepOuter gst (SplitOnSuffixSeqSingle fs st pat) = do
@ -1876,23 +1999,21 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) =
case r of
FL.Partial fs1 -> skip $ SplitOnSuffixSeqWordDone (n - 1) fs1 wrd
FL.Done b -> do
fs1 <- initial
let next = SplitOnSuffixSeqWordDone (n - 1) fs1 wrd
skip $ SplitOnSuffixSeqYield b next
let jump c = SplitOnSuffixSeqWordDone (n - 1) c wrd
yieldProceed jump b
stepOuter gst (SplitOnSuffixSeqWordInit st0) = do
stepOuter gst (SplitOnSuffixSeqWordInit fs0 st0) = do
res <- step (adaptState gst) st0
case res of
Yield x s -> do
fs <- initial
let wrd = addToWord 0 x
r <- if withSep then fstep fs x else return $ FL.Partial fs
r <- if withSep then fstep fs0 x else return $ FL.Partial fs0
case r of
FL.Partial fs1 -> go SPEC 1 wrd s fs1
FL.Done b -> do
let next = SplitOnSuffixSeqWordInit s
in skip $ SplitOnSuffixSeqYield b next
Skip s -> skip (SplitOnSuffixSeqWordInit s)
let jump c = SplitOnSuffixSeqWordInit c s
yieldProceed jump b
Skip s -> skip (SplitOnSuffixSeqWordInit fs0 s)
Stop -> return Stop
where
@ -1902,6 +2023,7 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) =
res <- step (adaptState gst) st
case res of
Yield x s -> do
let jump c = SplitOnSuffixSeqWordInit c s
let wrd1 = addToWord wrd x
r <- if withSep then fstep fs x else return $ FL.Partial fs
case r of
@ -1910,13 +2032,8 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) =
then go SPEC (idx + 1) wrd1 s fs1
else if wrd1 .&. wordMask /= wordPat
then skip $ SplitOnSuffixSeqWordLoop wrd1 s fs1
else do
b <- done fs
let next = SplitOnSuffixSeqWordInit s
skip $ SplitOnSuffixSeqYield b next
FL.Done b ->
let next = SplitOnSuffixSeqWordInit s
in skip $ SplitOnSuffixSeqYield b next
else do done fs >>= yieldProceed jump
FL.Done b -> yieldProceed jump b
Skip s -> go SPEC idx wrd s fs
Stop -> skip $ SplitOnSuffixSeqWordDone idx fs wrd
@ -1930,7 +2047,8 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) =
res <- step (adaptState gst) st
case res of
Yield x s -> do
let wrd1 = addToWord wrd x
let jump c = SplitOnSuffixSeqWordInit c s
wrd1 = addToWord wrd x
old = (wordMask .&. wrd)
`shiftR` (elemBits * (patLen - 1))
r <-
@ -1940,14 +2058,9 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) =
case r of
FL.Partial fs1 ->
if wrd1 .&. wordMask == wordPat
then do
b <- done fs1
let next = SplitOnSuffixSeqWordInit s
skip $ SplitOnSuffixSeqYield b next
then done fs1 >>= yieldProceed jump
else go SPEC wrd1 s fs1
FL.Done b ->
let next = SplitOnSuffixSeqWordInit s
in skip $ SplitOnSuffixSeqYield b next
FL.Done b -> yieldProceed jump b
Skip s -> go SPEC wrd s fs
Stop ->
if wrd .&. wordMask == wordPat
@ -1962,20 +2075,20 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) =
-- General Pattern - Karp Rabin
-------------------------------
stepOuter gst (SplitOnSuffixSeqKRInit idx0 st0 rb rh0) = do
stepOuter gst (SplitOnSuffixSeqKRInit idx0 fs st0 rb rh0) = do
res <- step (adaptState gst) st0
case res of
Yield x s -> do
rh1 <- liftIO $ RB.unsafeInsert rb rh0 x
fs <- initial
r <- if withSep then fstep fs x else return $ FL.Partial fs
case r of
FL.Partial fs1 ->
skip $ SplitOnSuffixSeqKRInit1 fs1 s rb rh1
FL.Done b ->
let next = SplitOnSuffixSeqKRInit 0 s rb (RB.startOf rb)
in skip $ SplitOnSuffixSeqYield b next
Skip s -> skip $ SplitOnSuffixSeqKRInit idx0 s rb rh0
FL.Done b -> do
let rst = RB.startOf rb
jump c = SplitOnSuffixSeqKRInit 0 c s rb rst
yieldProceed jump b
Skip s -> skip $ SplitOnSuffixSeqKRInit idx0 fs s rb rh0
Stop -> return Stop
stepOuter gst (SplitOnSuffixSeqKRInit1 fs0 st0 rb rh0) = do
@ -2000,10 +2113,10 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) =
then SplitOnSuffixSeqKRCheck fs1 s rb rh1
else SplitOnSuffixSeqKRLoop
fs1 s rb rh1 ringHash
FL.Done b ->
let next = SplitOnSuffixSeqKRInit
0 st rb (RB.startOf rb)
in skip $ SplitOnSuffixSeqYield b next
FL.Done b -> do
let rst = RB.startOf rb
jump c = SplitOnSuffixSeqKRInit 0 c s rb rst
yieldProceed jump b
Skip s -> go SPEC idx rh s fs
Stop -> do
-- do not issue a blank segment when we end at pattern
@ -2033,10 +2146,10 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) =
if (cksum1 /= patHash)
then go SPEC fs1 s rh1 cksum1
else skip $ SplitOnSuffixSeqKRCheck fs1 s rb rh1
FL.Done b ->
let next = SplitOnSuffixSeqKRInit
0 st rb (RB.startOf rb)
in skip $ SplitOnSuffixSeqYield b next
FL.Done b -> do
let rst = RB.startOf rb
jump c = SplitOnSuffixSeqKRInit 0 c s rb rst
yieldProceed jump b
Skip s -> go SPEC fs s rh cksum
Stop ->
if RB.unsafeEqArray rb rh patArr
@ -2051,8 +2164,9 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) =
if RB.unsafeEqArray rb rh patArr
then do
r <- done fs
let next = SplitOnSuffixSeqKRInit 0 st rb (RB.startOf rb)
skip $ SplitOnSuffixSeqYield r next
let rst = RB.startOf rb
jump c = SplitOnSuffixSeqKRInit 0 c st rb rst
yieldProceed jump r
else skip $ SplitOnSuffixSeqKRLoop fs st rb rh patHash
stepOuter _ (SplitOnSuffixSeqKRDone 0 fs _ _) = do
@ -2065,9 +2179,8 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) =
case r of
FL.Partial fs1 -> skip $ SplitOnSuffixSeqKRDone (n - 1) fs1 rb rh1
FL.Done b -> do
fs1 <- initial
let next = SplitOnSuffixSeqKRDone (n - 1) fs1 rb rh1
skip $ SplitOnSuffixSeqYield b next
let jump c = SplitOnSuffixSeqKRDone (n - 1) c rb rh1
yieldProceed jump b
------------------------------------------------------------------------------
-- Nested Container Transformation

View File

@ -258,12 +258,15 @@ tap (Fold fstep initial extract) (Stream step state) = Stream step' TapInit
where
step' _ TapInit = do
r <- initial
return $ Skip (Tapping r state)
res <- initial
return
$ Skip
$ case res of
FL.Partial s -> Tapping s state
FL.Done _ -> TapDone state
step' gst (Tapping acc st) = do
r <- step gst st
case r of
-- XXX Abstract Yield?
Yield x s -> do
res <- fstep acc x
return
@ -299,8 +302,12 @@ tapOffsetEvery offset n (Fold fstep initial extract) (Stream step state) =
{-# INLINE_LATE step' #-}
step' _ TapOffInit = do
r <- initial
return $ Skip $ TapOffTapping r state (offset `mod` n)
res <- initial
return
$ Skip
$ case res of
FL.Partial s -> TapOffTapping s state (offset `mod` n)
FL.Done _ -> TapOffDone state
step' gst (TapOffTapping acc st count) = do
r <- step gst st
case r of
@ -478,35 +485,65 @@ tapAsync f (Stream step1 state1) = Stream step TapInit
-- Scanning with a Fold
------------------------------------------------------------------------------
data PostScanState s f = PostScan s !f
data ScanState s f = ScanInit s | ScanDo s !f | ScanDone
{-# INLINE_NORMAL postscanOnce #-}
postscanOnce :: Monad m => FL.Fold m a b -> Stream m a -> Stream m b
postscanOnce (FL.Fold fstep begin done) (Stream step state) =
Stream step' (PostScan state begin)
postscanOnce (FL.Fold fstep initial extract) (Stream sstep state) =
Stream step (ScanInit state)
where
{-# INLINE_LATE step' #-}
step' gst (PostScan st acc) = do
r <- step (adaptState gst) st
case r of
{-# INLINE_LATE step #-}
step _ (ScanInit st) = do
res <- initial
return
$ case res of
FL.Partial fs -> Skip $ ScanDo st fs
FL.Done b -> Yield b ScanDone
step gst (ScanDo st fs) = do
res <- sstep (adaptState gst) st
case res of
Yield x s -> do
old <- acc
res <- fstep old x
case res of
FL.Partial fs -> do
!v <- done fs
return $ Yield v $ PostScan s (return fs)
FL.Done _ -> return Stop
Skip s -> return $ Skip $ PostScan s acc
r <- fstep fs x
case r of
FL.Partial fs1 -> do
!b <- extract fs1
return $ Yield b $ ScanDo s fs1
FL.Done b -> return $ Yield b ScanDone
Skip s -> return $ Skip $ ScanDo s fs
Stop -> return Stop
step _ ScanDone = return Stop
{-# INLINE scanOnce #-}
scanOnce :: Monad m
=> FL.Fold m a b -> Stream m a -> Stream m b
scanOnce fld@(FL.Fold _ begin done) s =
(begin >>= \x -> x `seq` done x) `consM` postscanOnce fld s
scanOnce (FL.Fold fstep initial extract) (Stream sstep state) =
Stream step (ScanInit state)
where
{-# INLINE_LATE step #-}
step _ (ScanInit st) = do
res <- initial
case res of
FL.Partial fs -> do
!b <- extract fs
return $ Yield b $ ScanDo st fs
FL.Done b -> return $ Yield b ScanDone
step gst (ScanDo st fs) = do
res <- sstep (adaptState gst) st
case res of
Yield x s -> do
r <- fstep fs x
case r of
FL.Partial fs1 -> do
!b <- extract fs1
return $ Yield b $ ScanDo s fs1
FL.Done b -> return $ Yield b ScanDone
Skip s -> return $ Skip $ ScanDo s fs
Stop -> return Stop
step _ ScanDone = return Stop
------------------------------------------------------------------------------
-- Scanning - Prescans

View File

@ -388,8 +388,11 @@ toStreamD = fromStreamK . K.toStream
{-# INLINE_NORMAL foldOnce #-}
foldOnce :: (Monad m) => Fold m a b -> Stream m a -> m b
foldOnce (Fold fstep begin done) (Stream step state) =
begin >>= \x -> go SPEC x state
foldOnce (Fold fstep begin done) (Stream step state) = do
res <- begin
case res of
FL.Partial fs -> go SPEC fs state
FL.Done fb -> return fb
where
@ -942,6 +945,15 @@ data GroupState s fs b a
| GroupYield b (GroupState s fs b a)
| GroupFinish
-- | This is the stream equivalent of "Data.Fold.Internal.many". The fold
-- may consume 0 or more elements. It means:
--
-- * If the stream is empty the default value of the fold would still be
-- emitted in the output.
-- * At the end of the stream if the last application of the fold did not
-- receive any input it would still yield the default fold accumulator as the
-- last value.
--
{-# INLINE_NORMAL foldMany #-}
foldMany :: Monad m => Fold m a b -> Stream m a -> Stream m b
foldMany (Fold fstep initial extract) (Stream step state) =
@ -951,16 +963,21 @@ foldMany (Fold fstep initial extract) (Stream step state) =
{-# INLINE_LATE step' #-}
step' _ (GroupStart st) = do
-- fs = fold state
fs <- initial
return $ Skip (GroupBuffer st fs)
r <- initial
return
$ Skip
$ case r of
FL.Done b -> GroupYield b (GroupStart st)
FL.Partial fs -> GroupBuffer st fs
-- This state is not strictly required but it helps the compiler fuse the
-- code.
step' _ (GroupConsume st fs x) = do
fs' <- fstep fs x
case fs' of
FL.Done b -> return $ Skip (GroupYield b (GroupStart st))
FL.Partial ps -> return $ Skip (GroupBuffer st ps)
r <- fstep fs x
return
$ Skip
$ case r of
FL.Done b -> GroupYield b (GroupStart st)
FL.Partial fs1 -> GroupBuffer st fs1
step' gst (GroupBuffer st fs) = do
r <- step (adaptState gst) st
case r of
@ -972,37 +989,70 @@ foldMany (Fold fstep initial extract) (Stream step state) =
step' _ (GroupYield b next) = return $ Yield b next
step' _ GroupFinish = return Stop
{-# ANN type FoldMany1 Fuse #-}
data FoldMany1 s fs b a
= FoldMany1Start s
| FoldMany1First fs s
| FoldMany1Consume s fs a
| FoldMany1Gen s fs
| FoldMany1Yield b (FoldMany1 s fs b a)
| FoldMany1Done
-- | Like 'foldMany' except that the fold consumes 1 or more elements. It
-- means:
--
-- * If the stream is empty the output would be empty.
-- * At the end of the stream if the last application of the fold did not
-- receive any input it would not result in any output.
--
-- @foldMany1 f = parseMany (fromFold f)@
--
-- This could be problematic unless we use an accumulator fold instead of a
-- terminating fold. A terminating fold may terminate even without accepting a
-- single element. We can use parseMany instead where we can return the input
-- if the fold terminates without accepting any input. The alternative is to
-- always execute the fold's initial action and discard it in case the stream
-- stops without actually feeding input to it.
--
{-# INLINE_NORMAL foldMany1 #-}
foldMany1 :: Monad m => Fold m a b -> Stream m a -> Stream m b
foldMany1 (Fold fstep initial extract) (Stream step state) =
Stream step' (GroupStart state)
Stream step' (FoldMany1Start state)
where
{-# INLINE_LATE step' #-}
step' gst (GroupStart st) = do
step' _ (FoldMany1Start st) = do
r <- initial
return
$ Skip
$ case r of
FL.Done b -> FoldMany1Yield b (FoldMany1Start st)
FL.Partial fs -> FoldMany1First fs st
step' gst (FoldMany1First fs st) = do
r <- step (adaptState gst) st
case r of
Yield x s -> do
fi <- initial
return $ Skip $ GroupConsume s fi x
Skip s -> return $ Skip (GroupStart s)
Stop -> return $ Stop
step' _ (GroupConsume st fs x) = do
fs' <- fstep fs x
case fs' of
FL.Done b -> return $ Skip (GroupYield b (GroupStart st))
FL.Partial ps -> return $ Skip (GroupBuffer st ps)
step' gst (GroupBuffer st fs) = do
return $ Skip $ FoldMany1Consume s fs x
Skip s -> return $ Skip (FoldMany1Start s)
Stop -> return Stop
step' _ (FoldMany1Consume st fs x) = do
r <- fstep fs x
return
$ Skip
$ case r of
FL.Done b -> FoldMany1Yield b (FoldMany1Start st)
FL.Partial fs1 -> FoldMany1Gen st fs1
step' gst (FoldMany1Gen st fs) = do
r <- step (adaptState gst) st
case r of
Yield x s -> return $ Skip $ GroupConsume s fs x
Skip s -> return $ Skip (GroupBuffer s fs)
Yield x s -> return $ Skip $ FoldMany1Consume s fs x
Skip s -> return $ Skip (FoldMany1Gen s fs)
Stop -> do
b <- extract fs
return $ Skip (GroupYield b GroupFinish)
step' _ (GroupYield b next) = return $ Yield b next
step' _ GroupFinish = return Stop
return $ Skip (FoldMany1Yield b FoldMany1Done)
step' _ (FoldMany1Yield b next) = return $ Yield b next
step' _ FoldMany1Done = return Stop
data GroupState2 s fs
= GroupStart2 s

View File

@ -436,19 +436,22 @@ foldlMx' step begin done m = go begin m
{-# INLINABLE foldOnce #-}
foldOnce :: (IsStream t, Monad m) => FL.Fold m a b -> t m a -> m b
foldOnce (FL.Fold step begin done) = go begin
foldOnce (FL.Fold step begin done) m = do
res <- begin
case res of
FL.Partial fs -> go fs m
FL.Done fb -> return fb
where
go !acc m1 =
let stop = acc >>= done
single a = acc
>>= \b -> step b a
let stop = done acc
single a = step acc a
>>= \case
FL.Partial s -> done s
FL.Done b1 -> return b1
yieldk a r = acc
>>= \b -> step b a
yieldk a r = step acc a
>>= \case
FL.Partial s -> go (return s) r
FL.Partial s -> go s r
FL.Done b1 -> return b1
in foldStream defState yieldk single stop m1

View File

@ -276,8 +276,11 @@ swap = lmap Tuple.swap
--
{-# INLINE_NORMAL fold #-}
fold :: Monad m => Unfold m a b -> Fold m b c -> a -> m c
fold (Unfold ustep inject) (Fold fstep initial extract) a =
initial >>= \x -> inject a >>= go SPEC x
fold (Unfold ustep inject) (Fold fstep initial extract) a = do
res <- initial
case res of
FL.Partial x -> inject a >>= go SPEC x
FL.Done b -> return b
where

View File

@ -357,13 +357,16 @@ writeChunks path = Fold step initial extract
h <- liftIO (openFile path WriteMode)
fld <- FL.initialize (FH.writeChunks h)
`MC.onException` liftIO (hClose h)
return (fld, h)
return $ FL.Partial (fld, h)
step (fld, h) x = do
r <- FL.runStep fld x `MC.onException` liftIO (hClose h)
return $ FL.Partial (r, h)
extract (Fold _ initial1 extract1, h) = do
liftIO $ hClose h
initial1 >>= extract1
res <- initial1
case res of
FL.Partial fs -> extract1 fs
FL.Done fb -> return fb
-- | @writeWithBufferOf chunkSize handle@ writes the input stream to @handle@.
-- Bytes in the input stream are collected into a buffer until we have a chunk

View File

@ -357,13 +357,16 @@ writeChunks addr port = Fold step initial extract
initial = do
skt <- liftIO (connect addr port)
fld <- FL.initialize (SK.writeChunks skt) `MC.onException` liftIO (Net.close skt)
return (Tuple' fld skt)
return $ FL.Partial (Tuple' fld skt)
step (Tuple' fld skt) x = do
r <- FL.runStep fld x `MC.onException` liftIO (Net.close skt)
return $ FL.Partial (Tuple' r skt)
extract (Tuple' (Fold _ initial1 extract1) skt) = do
liftIO $ Net.close skt
initial1 >>= extract1
res <- initial1
case res of
FL.Partial fs -> extract1 fs
FL.Done fb -> return fb
-- | Like 'write' but provides control over the write buffer. Output will
-- be written to the IO device as soon as we collect the specified number of

View File

@ -7,7 +7,7 @@ import Test.Hspec (Spec, hspec, describe)
import Test.Hspec.QuickCheck
import Test.QuickCheck
(arbitrary, forAll, elements, Property, property, listOf,
vectorOf, Gen, suchThat)
vectorOf, Gen)
import Test.QuickCheck.Monadic (monadicIO, assert, run)
import Prelude hiding (sequence)
@ -490,9 +490,9 @@ many :: Property
many =
forAll (listOf (chooseInt (0, 1))) $ \ls ->
let fldstp conL currL = return $ FL.Partial $ conL ++ currL
concatFold = FL.Fold fldstp (return []) return
prsr = P.many concatFold
$ P.fromFold $ FL.sliceSepBy (== 1) FL.toList
concatFold = FL.Fold fldstp (return (FL.Partial [])) return
prsr =
P.many concatFold $ P.fromFold $ FL.sliceSepBy (== 1) FL.toList
in
case S.parse prsr (S.fromList ls) of
Right res_list -> checkListEqual res_list
@ -510,13 +510,10 @@ some =
forAll (listOf (chooseInt (0, 1))) $ \genLs ->
let
ls = 0 : genLs
concatFold = FL.Fold
(\concatList curr_list ->
return $ FL.Partial $ concatList ++ curr_list)
(return [])
return
prsr = P.some concatFold
$ P.fromFold $ FL.sliceSepBy (== 1) FL.toList
fldstp conL currL = return $ FL.Partial $ conL ++ currL
concatFold = FL.Fold fldstp (return (FL.Partial [])) return
prsr =
P.some concatFold $ P.fromFold $ FL.sliceSepBy (== 1) FL.toList
in
case S.parse prsr (S.fromList ls) of
Right res_list -> res_list == Prelude.filter (== 0) ls
@ -532,11 +529,9 @@ some =
-- Instances
-------------------------------------------------------------------------------
-- XXX Remove "`suchThat` (\x -> length x > 0)) $ \ list1 ->" once FL.ltake is
-- fixed.
applicative :: Property
applicative =
forAll (listOf (chooseAny :: Gen Int) `suchThat` (\x -> length x > 0)) $ \ list1 ->
forAll (listOf (chooseAny :: Gen Int)) $ \ list1 ->
forAll (listOf (chooseAny :: Gen Int)) $ \ list2 ->
let parser =
(,)
@ -548,32 +543,26 @@ applicative =
listEquals (==) olist1 list1
listEquals (==) olist2 list2
-- XXX Remove "`suchThat` (\x -> length x > 0)) $ \ list1 ->" once FL.ltake is
-- fixed.
sequence :: Property
sequence =
forAll (vectorOf 11 (listOf (chooseAny :: Gen Int) `suchThat`
(\x -> length x > 0))) $ \ ins ->
let p xs = P.fromFold (FL.ltake (length xs) FL.toList)
in monadicIO $ do
forAll (vectorOf 11 (listOf (chooseAny :: Gen Int))) $ \ ins ->
let p xs = P.fromFold (FL.ltake (length xs) FL.toList)
in monadicIO $ do
outs <- run $
S.parse
(Prelude.sequence $ fmap p ins)
(S.fromList $ concat ins)
listEquals (==) outs ins
-- XXX Remove "`suchThat` (\x -> length x > 0)) $ \ list1 ->" once FL.ltake is
-- fixed.
monad :: Property
monad =
forAll (listOf (chooseAny :: Gen Int) `suchThat`
(\x -> length x > 0)) $ \ list1 ->
forAll (listOf (chooseAny :: Gen Int)) $ \ list2 ->
let parser = do
olist1 <- P.fromFold (FL.ltake (length list1) FL.toList)
olist2 <- P.fromFold (FL.ltake (length list2) FL.toList)
return (olist1, olist2)
in monadicIO $ do
forAll (listOf (chooseAny :: Gen Int)) $ \ list1 ->
forAll (listOf (chooseAny :: Gen Int)) $ \ list2 ->
let parser = do
olist1 <- P.fromFold (FL.ltake (length list1) FL.toList)
olist2 <- P.fromFold (FL.ltake (length list2) FL.toList)
return (olist1, olist2)
in monadicIO $ do
(olist1, olist2) <-
run $ S.parse parser (S.fromList $ list1 ++ list2)
listEquals (==) olist1 list1

View File

@ -7,7 +7,7 @@ import Test.Hspec (Spec, hspec, describe)
import Test.Hspec.QuickCheck
import Test.QuickCheck
(arbitrary, forAll, elements, Property,
property, listOf, vectorOf, (.&&.), Gen, suchThat)
property, listOf, vectorOf, (.&&.), Gen)
import Test.QuickCheck.Monadic (monadicIO, assert, run)
import qualified Data.List as List
@ -492,8 +492,10 @@ many =
$ \ls ->
let fldstp conL currL = return $ FL.Partial (conL ++ currL)
concatFold =
FL.Fold fldstp (return []) return
prsr = P.many concatFold $ P.fromFold $ FL.sliceSepBy (== 1) FL.toList
FL.Fold fldstp (return (FL.Partial [])) return
prsr =
P.many concatFold
$ P.fromFold $ FL.sliceSepBy (== 1) FL.toList
in case S.parseD prsr (S.fromList ls) of
Right res_list ->
checkListEqual res_list (Prelude.filter (== 0) ls)
@ -510,8 +512,10 @@ some =
forAll (listOf (chooseInt (0, 1)))
$ \ls ->
let fldstp conL currL = return $ FL.Partial $ conL ++ currL
concatFold = FL.Fold fldstp (return []) return
prsr = P.some concatFold $ P.fromFold $ FL.sliceSepBy (== 1) FL.toList
concatFold = FL.Fold fldstp (return (FL.Partial [])) return
prsr =
P.some concatFold
$ P.fromFold $ FL.sliceSepBy (== 1) FL.toList
in case S.parseD prsr (S.fromList ls) of
Right res_list -> res_list == Prelude.filter (== 0) ls
Left _ -> False
@ -526,11 +530,9 @@ someFail =
-- Instances
-------------------------------------------------------------------------------
-- XXX Remove "`suchThat` (\x -> length x > 0)) $ \ list1 ->" once FL.ltake is
-- fixed.
applicative :: Property
applicative =
forAll (listOf (chooseAny :: Gen Int) `suchThat` (\x -> length x > 0)) $ \ list1 ->
forAll (listOf (chooseAny :: Gen Int)) $ \ list1 ->
forAll (listOf (chooseAny :: Gen Int)) $ \ list2 ->
let parser =
(,)
@ -542,11 +544,9 @@ applicative =
listEquals (==) olist1 list1
listEquals (==) olist2 list2
-- XXX Remove "`suchThat` (\x -> length x > 0)) $ \ list1 ->" once FL.ltake is
-- fixed.
sequence :: Property
sequence =
forAll (vectorOf 11 (listOf (chooseAny :: Gen Int) `suchThat` (\x -> length x > 0))) $ \ ins ->
forAll (vectorOf 11 (listOf (chooseAny :: Gen Int))) $ \ ins ->
let parsers = fmap (\xs -> P.fromFold $ FL.ltake (length xs) FL.toList) ins
in monadicIO $ do
outs <- run $
@ -555,11 +555,9 @@ sequence =
(S.fromList $ concat ins)
listEquals (==) outs ins
-- XXX Remove "`suchThat` (\x -> length x > 0)) $ \ list1 ->" once FL.ltake is
-- fixed.
monad :: Property
monad =
forAll (listOf (chooseAny :: Gen Int) `suchThat` (\x -> length x > 0)) $ \ list1 ->
forAll (listOf (chooseAny :: Gen Int)) $ \ list1 ->
forAll (listOf (chooseAny :: Gen Int)) $ \ list2 ->
let parser = do
olist1 <- P.fromFold (FL.ltake (length list1) FL.toList)

View File

@ -1168,8 +1168,8 @@ transformCombineOpsCommon constr desc eq t = do
withMaxSuccess maxTestCount $
monadicIO $ do
cref <- run $ newIORef 0
let fldstp _ e = FL.Partial <$> modifyIORef' cref (e+)
sumfoldinref = FL.Fold fldstp (return ()) (const $ return ())
let fldstp _ e = modifyIORef' cref (e +)
sumfoldinref = FL.mkAccumM_ fldstp (return ())
op = S.tap sumfoldinref . S.mapM (\x -> return (x+1))
listOp = fmap (+1)
stream <- run ((S.toList . t) $ op (constr a <> constr b))