Rename the CPS stream type to StreamK (#2269)

This commit is contained in:
Ranjeet Ranjan 2023-03-02 12:03:44 +05:30 committed by GitHub
parent 34b2b49876
commit 7252323b76
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 403 additions and 401 deletions

View File

@ -128,10 +128,10 @@ unCross = id
type MonadAsync = Monad
#ifdef USE_STREAMK
mkCross :: StreamK.Stream m a -> StreamK.CrossStreamK m a
mkCross :: StreamK m a -> StreamK.CrossStreamK m a
mkCross = StreamK.mkCross
unCross :: StreamK.CrossStreamK m a -> StreamK.Stream m a
unCross :: StreamK.CrossStreamK m a -> StreamK m a
unCross = StreamK.unCross
#else
mkCross :: Stream m a -> Stream.CrossStream m a

View File

@ -116,7 +116,7 @@ Benchmarks that need to be added
-- Stream generation and elimination
-------------------------------------------------------------------------------
type Stream m a = S.Stream m a
type Stream m a = S.StreamK m a
{-# INLINE unfoldr #-}
unfoldr :: Int -> Int -> Stream m Int
@ -501,7 +501,7 @@ concatMapBySerial outer inner n =
-- Nested Composition
-------------------------------------------------------------------------------
instance Monad m => Applicative (S.Stream m) where
instance Monad m => Applicative (S.StreamK m) where
{-# INLINE pure #-}
pure = S.fromPure
@ -520,7 +520,7 @@ instance Monad m => Applicative (S.Stream m) where
-- NOTE: even though concatMap for StreamD is 3x faster compared to StreamK,
-- the monad instance of StreamD is slower than StreamK after foldr/build
-- fusion.
instance Monad m => Monad (S.Stream m) where
instance Monad m => Monad (S.StreamK m) where
{-# INLINE return #-}
return = pure

View File

@ -498,7 +498,7 @@ toStreamD arr@Array{..} =
D.mapM (`getIndexUnsafe` arr) $ D.enumerateFromToIntegral 0 (arrLen - 1)
{-# INLINE toStreamK #-}
toStreamK :: MonadIO m => Array a -> K.Stream m a
toStreamK :: MonadIO m => Array a -> K.StreamK m a
toStreamK arr@Array{..} = K.unfoldrM step 0
where

View File

@ -244,6 +244,7 @@ import GHC.Ptr (Ptr(..))
import Streamly.Internal.Data.Fold.Type (Fold(..))
import Streamly.Internal.Data.Producer.Type (Producer (..))
import Streamly.Internal.Data.Stream.StreamD.Type (Stream)
import Streamly.Internal.Data.Stream.StreamK.Type (StreamK)
import Streamly.Internal.Data.SVar.Type (adaptState, defState)
import Streamly.Internal.Data.Unfold.Type (Unfold(..))
import Streamly.Internal.System.IO (arrayPayloadSize, defaultChunkSize)
@ -1339,7 +1340,7 @@ arraysOf n (D.Stream step state) =
-- | Buffer the stream into arrays in memory.
{-# INLINE arrayStreamKFromStreamD #-}
arrayStreamKFromStreamD :: forall m a. (MonadIO m, Unbox a) =>
D.Stream m a -> m (K.Stream m (MutArray a))
D.Stream m a -> m (StreamK m (MutArray a))
arrayStreamKFromStreamD =
let n = allocBytesToElemCount (undefined :: a) defaultChunkSize
in D.foldr K.cons K.nil . arraysOf n
@ -1554,7 +1555,7 @@ toStreamD = toStreamDWith liftIO
{-# INLINE toStreamKWith #-}
toStreamKWith ::
forall m a. (Monad m, Unbox a)
=> (forall b. IO b -> m b) -> MutArray a -> K.Stream m a
=> (forall b. IO b -> m b) -> MutArray a -> StreamK m a
toStreamKWith liftio MutArray{..} = go arrStart
where
@ -1565,7 +1566,7 @@ toStreamKWith liftio MutArray{..} = go arrStart
in liftio elemM `K.consM` go (INDEX_NEXT(p,a))
{-# INLINE toStreamK #-}
toStreamK :: forall m a. (MonadIO m, Unbox a) => MutArray a -> K.Stream m a
toStreamK :: forall m a. (MonadIO m, Unbox a) => MutArray a -> StreamK m a
toStreamK = toStreamKWith liftIO
{-# INLINE_NORMAL toStreamDRevWith #-}
@ -1596,7 +1597,7 @@ toStreamDRev = toStreamDRevWith liftIO
{-# INLINE toStreamKRevWith #-}
toStreamKRevWith ::
forall m a. (Monad m, Unbox a)
=> (forall b. IO b -> m b) -> MutArray a -> K.Stream m a
=> (forall b. IO b -> m b) -> MutArray a -> StreamK m a
toStreamKRevWith liftio MutArray {..} =
let p = INDEX_PREV(arrEnd,a)
in go p
@ -1609,7 +1610,7 @@ toStreamKRevWith liftio MutArray {..} =
in liftio elemM `K.consM` go (INDEX_PREV(p,a))
{-# INLINE toStreamKRev #-}
toStreamKRev :: forall m a. (MonadIO m, Unbox a) => MutArray a -> K.Stream m a
toStreamKRev :: forall m a. (MonadIO m, Unbox a) => MutArray a -> StreamK m a
toStreamKRev = toStreamKRevWith liftIO
-------------------------------------------------------------------------------
@ -1836,7 +1837,7 @@ writeNAligned align = writeNWith (newAlignedPinned align)
--
{-# INLINE_NORMAL writeChunks #-}
writeChunks :: (MonadIO m, Unbox a) =>
Int -> Fold m a (K.Stream n (MutArray a))
Int -> Fold m a (StreamK n (MutArray a))
writeChunks n = FL.many (writeN n) FL.toStreamK
-- XXX Compare writeWith with fromStreamD which uses an array of streams
@ -1946,7 +1947,7 @@ fromListRevN n xs = D.fold (writeRevN n) $ D.fromList xs
-------------------------------------------------------------------------------
{-# INLINE arrayStreamKLength #-}
arrayStreamKLength :: (Monad m, Unbox a) => K.Stream m (MutArray a) -> m Int
arrayStreamKLength :: (Monad m, Unbox a) => StreamK m (MutArray a) -> m Int
arrayStreamKLength as = K.foldl' (+) 0 (K.map length as)
-- | Convert an array stream to an array. Note that this requires peak memory
@ -1954,7 +1955,7 @@ arrayStreamKLength as = K.foldl' (+) 0 (K.map length as)
--
{-# INLINE fromArrayStreamK #-}
fromArrayStreamK :: (Unbox a, MonadIO m) =>
K.Stream m (MutArray a) -> m (MutArray a)
StreamK m (MutArray a) -> m (MutArray a)
fromArrayStreamK as = do
len <- arrayStreamKLength as
fromStreamDN len $ D.unfoldMany reader $ D.fromStreamK as

View File

@ -262,7 +262,7 @@ fromStreamD str = unsafeFreeze <$> MA.fromStreamD str
{-# INLINE bufferChunks #-}
bufferChunks :: (MonadIO m, Unbox a) =>
D.Stream m a -> m (K.Stream m (Array a))
D.Stream m a -> m (K.StreamK m (Array a))
bufferChunks m = D.foldr K.cons K.nil $ arraysOf defaultChunkSize m
-- | @arraysOf n stream@ groups the elements in the input stream into arrays of
@ -348,7 +348,7 @@ toStreamD :: forall m a. (Monad m, Unbox a) => Array a -> D.Stream m a
toStreamD arr = MA.toStreamDWith (return . unsafeInlineIO) (unsafeThaw arr)
{-# INLINE toStreamK #-}
toStreamK :: forall m a. (Monad m, Unbox a) => Array a -> K.Stream m a
toStreamK :: forall m a. (Monad m, Unbox a) => Array a -> K.StreamK m a
toStreamK arr = MA.toStreamKWith (return . unsafeInlineIO) (unsafeThaw arr)
{-# INLINE_NORMAL toStreamDRev #-}
@ -357,7 +357,7 @@ toStreamDRev arr =
MA.toStreamDRevWith (return . unsafeInlineIO) (unsafeThaw arr)
{-# INLINE toStreamKRev #-}
toStreamKRev :: forall m a. (Monad m, Unbox a) => Array a -> K.Stream m a
toStreamKRev :: forall m a. (Monad m, Unbox a) => Array a -> K.StreamK m a
toStreamKRev arr =
MA.toStreamKRevWith (return . unsafeInlineIO) (unsafeThaw arr)

View File

@ -688,7 +688,7 @@ toList = foldr' (:) []
-- xn : ... : x2 : x1 : []
{-# INLINE toStreamKRev #-}
toStreamKRev :: Monad m => Fold m a (K.Stream n a)
toStreamKRev :: Monad m => Fold m a (K.StreamK n a)
toStreamKRev = foldl' (flip K.cons) K.nil
-- | A fold that buffers its input to a pure stream.
@ -698,7 +698,7 @@ toStreamKRev = foldl' (flip K.cons) K.nil
--
-- /Internal/
{-# INLINE toStreamK #-}
toStreamK :: Monad m => Fold m a (K.Stream n a)
toStreamK :: Monad m => Fold m a (K.StreamK n a)
toStreamK = foldr K.cons K.nil
------------------------------------------------------------------------------

View File

@ -367,7 +367,7 @@ foldBreakD (FL.Fold fstep initial extract) stream@(D.Stream step state) = do
{-# INLINE_NORMAL foldBreakK #-}
foldBreakK :: forall m a b. (MonadIO m, Unbox a) =>
Fold m a b -> K.Stream m (Array a) -> m (b, K.Stream m (Array a))
Fold m a b -> K.StreamK m (Array a) -> m (b, K.StreamK m (Array a))
foldBreakK (FL.Fold fstep initial extract) stream = do
res <- initial
case res of
@ -644,8 +644,8 @@ parseBreakD
parseBreakK ::
forall m a b. (MonadIO m, Unbox a)
=> PRD.Parser a m b
-> K.Stream m (Array.Array a)
-> m (Either ParseError b, K.Stream m (Array.Array a))
-> K.StreamK m (Array.Array a)
-> m (Either ParseError b, K.StreamK m (Array.Array a))
parseBreakK (PRD.Parser pstep initial extract) stream = do
res <- initial
case res of

View File

@ -50,7 +50,7 @@ import Prelude hiding (foldr, repeat)
-- 'K.fromFoldable' for serial streams.
--
{-# INLINE_EARLY fromList #-}
fromList :: Monad m => [a] -> K.Stream m a
fromList :: Monad m => [a] -> K.StreamK m a
fromList = D.toStreamK . D.fromList
{-# RULES "fromList fallback to StreamK" [1]
forall a. D.toStreamK (D.fromList a) = K.fromFoldable a #-}
@ -58,7 +58,7 @@ fromList = D.toStreamK . D.fromList
-- | Convert a stream into a list in the underlying monad.
--
{-# INLINE toList #-}
toList :: Monad m => K.Stream m a -> m [a]
toList :: Monad m => K.StreamK m a -> m [a]
toList m = D.toList $ D.fromStreamK m
------------------------------------------------------------------------------
@ -66,23 +66,23 @@ toList m = D.toList $ D.fromStreamK m
------------------------------------------------------------------------------
{-# INLINE foldrM #-}
foldrM :: Monad m => (a -> m b -> m b) -> m b -> K.Stream m a -> m b
foldrM :: Monad m => (a -> m b -> m b) -> m b -> K.StreamK m a -> m b
foldrM step acc m = D.foldrM step acc $ D.fromStreamK m
{-# INLINE foldr #-}
foldr :: Monad m => (a -> b -> b) -> b -> K.Stream m a -> m b
foldr :: Monad m => (a -> b -> b) -> b -> K.StreamK m a -> m b
foldr f z = foldrM (\a b -> f a <$> b) (return z)
-- | Strict left associative fold.
--
{-# INLINE foldl' #-}
foldl' ::
Monad m => (b -> a -> b) -> b -> K.Stream m a -> m b
Monad m => (b -> a -> b) -> b -> K.StreamK m a -> m b
foldl' step begin m = D.foldl' step begin $ D.fromStreamK m
{-# INLINE fold #-}
fold :: Monad m => Fold m a b -> K.Stream m a -> m b
fold :: Monad m => Fold m a b -> K.StreamK m a -> m b
fold fld m = D.fold fld $ D.fromStreamK m
------------------------------------------------------------------------------
@ -93,7 +93,7 @@ fold fld m = D.fold fld $ D.fromStreamK m
--
{-# INLINE eqBy #-}
eqBy :: Monad m =>
(a -> b -> Bool) -> K.Stream m a -> K.Stream m b -> m Bool
(a -> b -> Bool) -> K.StreamK m a -> K.StreamK m b -> m Bool
eqBy f m1 m2 = D.eqBy f (D.fromStreamK m1) (D.fromStreamK m2)
-- | Compare two streams
@ -101,5 +101,5 @@ eqBy f m1 m2 = D.eqBy f (D.fromStreamK m1) (D.fromStreamK m2)
{-# INLINE cmpBy #-}
cmpBy
:: Monad m
=> (a -> b -> Ordering) -> K.Stream m a -> K.Stream m b -> m Ordering
=> (a -> b -> Ordering) -> K.StreamK m a -> K.StreamK m b -> m Ordering
cmpBy f m1 m2 = D.cmpBy f (D.fromStreamK m1) (D.fromStreamK m2)

View File

@ -186,7 +186,7 @@ import qualified Streamly.Internal.Data.Unfold.Type as Unfold
-- | A stream consists of a step function that generates the next step given a
-- current state, and the current state.
data Stream m a =
forall s. UnStream (State K.Stream m a -> s -> m (Step s a)) s
forall s. UnStream (State K.StreamK m a -> s -> m (Step s a)) s
-- XXX This causes perf trouble when pattern matching with "Stream" in a
-- recursive way, e.g. in uncons, foldBreak, concatMap. We need to get rid of
@ -195,7 +195,7 @@ unShare :: Stream m a -> Stream m a
unShare (UnStream step state) = UnStream step' state
where step' gst = step (adaptState gst)
pattern Stream :: (State K.Stream m a -> s -> m (Step s a)) -> s -> Stream m a
pattern Stream :: (State K.StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
pattern Stream step state <- (unShare -> UnStream step state)
where Stream = UnStream
@ -361,7 +361,7 @@ fromList = Stream step
-- | Convert a CPS encoded StreamK to direct style step encoded StreamD
{-# INLINE_LATE fromStreamK #-}
fromStreamK :: Applicative m => K.Stream m a -> Stream m a
fromStreamK :: Applicative m => K.StreamK m a -> Stream m a
fromStreamK = Stream step
where
step gst m1 =
@ -372,7 +372,7 @@ fromStreamK = Stream step
-- | Convert a direct style step encoded StreamD to a CPS encoded StreamK
{-# INLINE_LATE toStreamK #-}
toStreamK :: Monad m => Stream m a -> K.Stream m a
toStreamK :: Monad m => Stream m a -> K.StreamK m a
toStreamK (Stream step state) = go state
where
go st = K.MkStream $ \gst yld _ stp ->

View File

@ -49,8 +49,8 @@
module Streamly.Internal.Data.Stream.StreamK
(
-- * The stream type
Stream(..) -- XXX stop exporting this
, StreamK
Stream
, StreamK(..)
, fromStream
, toStream
@ -285,7 +285,7 @@ toStream = Stream.fromStreamK
-- concurrent version could be async or ahead etc. Depending on how we queue
-- back the feedback portion b, it could be DFS or BFS style.
--
unfoldrA :: (b -> Maybe (m a, b)) -> b -> Stream m a
unfoldrA :: (b -> Maybe (m a, b)) -> b -> StreamK m a
unfoldrA = undefined
-}
@ -293,35 +293,35 @@ unfoldrA = undefined
-- Special generation
-------------------------------------------------------------------------------
repeatM :: Monad m => m a -> Stream m a
repeatM :: Monad m => m a -> StreamK m a
repeatM = repeatMWith consM
{-# INLINE replicateM #-}
replicateM :: Monad m => Int -> m a -> Stream m a
replicateM :: Monad m => Int -> m a -> StreamK m a
replicateM = replicateMWith consM
{-# INLINE replicate #-}
replicate :: Int -> a -> Stream m a
replicate :: Int -> a -> StreamK m a
replicate n a = go n
where
go cnt = if cnt <= 0 then nil else a `cons` go (cnt - 1)
{-# INLINE fromIndicesM #-}
fromIndicesM :: Monad m => (Int -> m a) -> Stream m a
fromIndicesM :: Monad m => (Int -> m a) -> StreamK m a
fromIndicesM = fromIndicesMWith consM
{-# INLINE fromIndices #-}
fromIndices :: (Int -> a) -> Stream m a
fromIndices :: (Int -> a) -> StreamK m a
fromIndices gen = go 0
where
go n = gen n `cons` go (n + 1)
{-# INLINE iterate #-}
iterate :: (a -> a) -> a -> Stream m a
iterate :: (a -> a) -> a -> StreamK m a
iterate step = go
where
go !s = cons s (go (step s))
{-# INLINE iterateM #-}
iterateM :: Monad m => (a -> m a) -> m a -> Stream m a
iterateM :: Monad m => (a -> m a) -> m a -> StreamK m a
iterateM = iterateMWith consM
-------------------------------------------------------------------------------
@ -329,7 +329,7 @@ iterateM = iterateMWith consM
-------------------------------------------------------------------------------
{-# INLINE fromList #-}
fromList :: [a] -> Stream m a
fromList :: [a] -> StreamK m a
fromList = fromFoldable
-------------------------------------------------------------------------------
@ -337,7 +337,7 @@ fromList = fromFoldable
-------------------------------------------------------------------------------
{-# INLINE foldr1 #-}
foldr1 :: Monad m => (a -> a -> a) -> Stream m a -> m (Maybe a)
foldr1 :: Monad m => (a -> a -> a) -> StreamK m a -> m (Maybe a)
foldr1 step m = do
r <- uncons m
case r of
@ -354,7 +354,7 @@ foldr1 step m = do
-- | Like 'foldx', but with a monadic step function.
{-# INLINABLE foldlMx' #-}
foldlMx' :: Monad m
=> (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> m b
=> (x -> a -> m x) -> m x -> (x -> m b) -> StreamK m a -> m b
foldlMx' step begin done = go begin
where
go !acc m1 =
@ -379,7 +379,7 @@ foldlMx' step begin done = go begin
-- 5050
--
{-# INLINABLE fold #-}
fold :: Monad m => FL.Fold m a b -> Stream m a -> m b
fold :: Monad m => FL.Fold m a b -> StreamK m a -> m b
fold (FL.Fold step begin done) m = do
res <- begin
case res of
@ -408,7 +408,7 @@ fold (FL.Fold step begin done) m = do
-- /Internal/
{-# INLINE foldEither #-}
foldEither :: Monad m =>
Fold m a b -> Stream m a -> m (Either (Fold m a b) (b, Stream m a))
Fold m a b -> StreamK m a -> m (Either (Fold m a b) (b, StreamK m a))
foldEither (FL.Fold step begin done) m = do
res <- begin
case res of
@ -436,7 +436,7 @@ foldEither (FL.Fold step begin done) m = do
-- would be 'StreamK.nil' if the stream finished before the fold.
--
{-# INLINE foldBreak #-}
foldBreak :: Monad m => Fold m a b -> Stream m a -> m (b, Stream m a)
foldBreak :: Monad m => Fold m a b -> StreamK m a -> m (b, StreamK m a)
foldBreak fld strm = do
r <- foldEither fld strm
case r of
@ -513,7 +513,7 @@ foldConcat
-- | Like 'foldl'' but with a monadic step function.
{-# INLINE foldlM' #-}
foldlM' :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> m b
foldlM' :: Monad m => (b -> a -> m b) -> m b -> StreamK m a -> m b
foldlM' step begin = foldlMx' step begin return
------------------------------------------------------------------------------
@ -521,7 +521,7 @@ foldlM' step begin = foldlMx' step begin return
------------------------------------------------------------------------------
{-# INLINE head #-}
head :: Monad m => Stream m a -> m (Maybe a)
head :: Monad m => StreamK m a -> m (Maybe a)
-- head = foldrM (\x _ -> return $ Just x) (return Nothing)
head m =
let stop = return Nothing
@ -530,7 +530,7 @@ head m =
in foldStream defState yieldk single stop m
{-# INLINE elem #-}
elem :: (Monad m, Eq a) => a -> Stream m a -> m Bool
elem :: (Monad m, Eq a) => a -> StreamK m a -> m Bool
elem e = go
where
go m1 =
@ -540,7 +540,7 @@ elem e = go
in foldStream defState yieldk single stop m1
{-# INLINE notElem #-}
notElem :: (Monad m, Eq a) => a -> Stream m a -> m Bool
notElem :: (Monad m, Eq a) => a -> StreamK m a -> m Bool
notElem e = go
where
go m1 =
@ -550,7 +550,7 @@ notElem e = go
in foldStream defState yieldk single stop m1
{-# INLINABLE all #-}
all :: Monad m => (a -> Bool) -> Stream m a -> m Bool
all :: Monad m => (a -> Bool) -> StreamK m a -> m Bool
all p = go
where
go m1 =
@ -561,7 +561,7 @@ all p = go
in foldStream defState yieldk single (return True) m1
{-# INLINABLE any #-}
any :: Monad m => (a -> Bool) -> Stream m a -> m Bool
any :: Monad m => (a -> Bool) -> StreamK m a -> m Bool
any p = go
where
go m1 =
@ -573,11 +573,11 @@ any p = go
-- | Extract the last element of the stream, if any.
{-# INLINE last #-}
last :: Monad m => Stream m a -> m (Maybe a)
last :: Monad m => StreamK m a -> m (Maybe a)
last = foldlx' (\_ y -> Just y) Nothing id
{-# INLINE minimum #-}
minimum :: (Monad m, Ord a) => Stream m a -> m (Maybe a)
minimum :: (Monad m, Ord a) => StreamK m a -> m (Maybe a)
minimum = go Nothing
where
go Nothing m1 =
@ -601,7 +601,7 @@ minimum = go Nothing
{-# INLINE minimumBy #-}
minimumBy
:: (Monad m)
=> (a -> a -> Ordering) -> Stream m a -> m (Maybe a)
=> (a -> a -> Ordering) -> StreamK m a -> m (Maybe a)
minimumBy cmp = go Nothing
where
go Nothing m1 =
@ -621,7 +621,7 @@ minimumBy cmp = go Nothing
in foldStream defState yieldk single stop m1
{-# INLINE maximum #-}
maximum :: (Monad m, Ord a) => Stream m a -> m (Maybe a)
maximum :: (Monad m, Ord a) => StreamK m a -> m (Maybe a)
maximum = go Nothing
where
go Nothing m1 =
@ -643,7 +643,7 @@ maximum = go Nothing
in foldStream defState yieldk single stop m1
{-# INLINE maximumBy #-}
maximumBy :: Monad m => (a -> a -> Ordering) -> Stream m a -> m (Maybe a)
maximumBy :: Monad m => (a -> a -> Ordering) -> StreamK m a -> m (Maybe a)
maximumBy cmp = go Nothing
where
go Nothing m1 =
@ -663,7 +663,7 @@ maximumBy cmp = go Nothing
in foldStream defState yieldk single stop m1
{-# INLINE (!!) #-}
(!!) :: Monad m => Stream m a -> Int -> m (Maybe a)
(!!) :: Monad m => StreamK m a -> Int -> m (Maybe a)
m !! i = go i m
where
go n m1 =
@ -675,7 +675,7 @@ m !! i = go i m
in foldStream defState yieldk single (return Nothing) m1
{-# INLINE lookup #-}
lookup :: (Monad m, Eq a) => a -> Stream m (a, b) -> m (Maybe b)
lookup :: (Monad m, Eq a) => a -> StreamK m (a, b) -> m (Maybe b)
lookup e = go
where
go m1 =
@ -686,7 +686,7 @@ lookup e = go
in foldStream defState yieldk single (return Nothing) m1
{-# INLINE findM #-}
findM :: Monad m => (a -> m Bool) -> Stream m a -> m (Maybe a)
findM :: Monad m => (a -> m Bool) -> StreamK m a -> m (Maybe a)
findM p = go
where
go m1 =
@ -699,11 +699,11 @@ findM p = go
in foldStream defState yieldk single (return Nothing) m1
{-# INLINE find #-}
find :: Monad m => (a -> Bool) -> Stream m a -> m (Maybe a)
find :: Monad m => (a -> Bool) -> StreamK m a -> m (Maybe a)
find p = findM (return . p)
{-# INLINE findIndices #-}
findIndices :: (a -> Bool) -> Stream m a -> Stream m Int
findIndices :: (a -> Bool) -> StreamK m a -> StreamK m Int
findIndices p = go 0
where
go offset m1 = mkStream $ \st yld sng stp ->
@ -721,7 +721,7 @@ findIndices p = go 0
-- | Apply a monadic action to each element of the stream and discard the
-- output of the action.
{-# INLINE mapM_ #-}
mapM_ :: Monad m => (a -> m b) -> Stream m a -> m ()
mapM_ :: Monad m => (a -> m b) -> StreamK m a -> m ()
mapM_ f = go
where
go m1 =
@ -731,7 +731,7 @@ mapM_ f = go
in foldStream defState yieldk single stop m1
{-# INLINE mapM #-}
mapM :: Monad m => (a -> m b) -> Stream m a -> Stream m b
mapM :: Monad m => (a -> m b) -> StreamK m a -> StreamK m b
mapM = mapMWith consM
------------------------------------------------------------------------------
@ -739,13 +739,13 @@ mapM = mapMWith consM
------------------------------------------------------------------------------
{-# INLINABLE toList #-}
toList :: Monad m => Stream m a -> m [a]
toList :: Monad m => StreamK m a -> m [a]
toList = foldr (:) []
-- Based on suggestions by David Feuer and Pranay Sashank
{-# INLINE hoist #-}
hoist :: (Monad m, Monad n)
=> (forall x. m x -> n x) -> Stream m a -> Stream n a
=> (forall x. m x -> n x) -> StreamK m a -> StreamK n a
hoist f str =
mkStream $ \st yld sng stp ->
let single = return . sng
@ -759,7 +759,7 @@ hoist f str =
-------------------------------------------------------------------------------
{-# INLINE scanlx' #-}
scanlx' :: (x -> a -> x) -> x -> (x -> b) -> Stream m a -> Stream m b
scanlx' :: (x -> a -> x) -> x -> (x -> b) -> StreamK m a -> StreamK m b
scanlx' step begin done m =
cons (done begin) $ go m begin
where
@ -771,7 +771,7 @@ scanlx' step begin done m =
in foldStream (adaptState st) yieldk single stp m1
{-# INLINE scanl' #-}
scanl' :: (b -> a -> b) -> b -> Stream m a -> Stream m b
scanl' :: (b -> a -> b) -> b -> StreamK m a -> StreamK m b
scanl' step begin = scanlx' step begin id
-------------------------------------------------------------------------------
@ -779,7 +779,7 @@ scanl' step begin = scanlx' step begin id
-------------------------------------------------------------------------------
{-# INLINE filter #-}
filter :: (a -> Bool) -> Stream m a -> Stream m a
filter :: (a -> Bool) -> StreamK m a -> StreamK m a
filter p = go
where
go m1 = mkStream $ \st yld sng stp ->
@ -790,7 +790,7 @@ filter p = go
in foldStream st yieldk single stp m1
{-# INLINE take #-}
take :: Int -> Stream m a -> Stream m a
take :: Int -> StreamK m a -> StreamK m a
take = go
where
go n1 m1 = mkStream $ \st yld sng stp ->
@ -800,7 +800,7 @@ take = go
else foldStream st yieldk sng stp m1
{-# INLINE takeWhile #-}
takeWhile :: (a -> Bool) -> Stream m a -> Stream m a
takeWhile :: (a -> Bool) -> StreamK m a -> StreamK m a
takeWhile p = go
where
go m1 = mkStream $ \st yld sng stp ->
@ -811,7 +811,7 @@ takeWhile p = go
in foldStream st yieldk single stp m1
{-# INLINE drop #-}
drop :: Int -> Stream m a -> Stream m a
drop :: Int -> StreamK m a -> StreamK m a
drop n m = unShare (go n m)
where
go n1 m1 = mkStream $ \st yld sng stp ->
@ -823,7 +823,7 @@ drop n m = unShare (go n m)
else foldStreamShared st yieldk single stp m1
{-# INLINE dropWhile #-}
dropWhile :: (a -> Bool) -> Stream m a -> Stream m a
dropWhile :: (a -> Bool) -> StreamK m a -> StreamK m a
dropWhile p = go
where
go m1 = mkStream $ \st yld sng stp ->
@ -840,7 +840,7 @@ dropWhile p = go
-- Be careful when modifying this, this uses a consM (|:) deliberately to allow
-- other stream types to overload it.
{-# INLINE sequence #-}
sequence :: Monad m => Stream m (m a) -> Stream m a
sequence :: Monad m => StreamK m (m a) -> StreamK m a
sequence = go
where
go m1 = mkStream $ \st yld sng stp ->
@ -853,7 +853,7 @@ sequence = go
-------------------------------------------------------------------------------
{-# INLINE intersperseM #-}
intersperseM :: Monad m => m a -> Stream m a -> Stream m a
intersperseM :: Monad m => m a -> StreamK m a -> StreamK m a
intersperseM a = prependingStart
where
prependingStart m1 = mkStream $ \st yld sng stp ->
@ -868,11 +868,11 @@ intersperseM a = prependingStart
in foldStream st yieldk single stp m2
{-# INLINE intersperse #-}
intersperse :: Monad m => a -> Stream m a -> Stream m a
intersperse :: Monad m => a -> StreamK m a -> StreamK m a
intersperse a = intersperseM (return a)
{-# INLINE insertBy #-}
insertBy :: (a -> a -> Ordering) -> a -> Stream m a -> Stream m a
insertBy :: (a -> a -> Ordering) -> a -> StreamK m a -> StreamK m a
insertBy cmp x = go
where
go m1 = mkStream $ \st yld _ _ ->
@ -890,7 +890,7 @@ insertBy cmp x = go
------------------------------------------------------------------------------
{-# INLINE deleteBy #-}
deleteBy :: (a -> a -> Bool) -> a -> Stream m a -> Stream m a
deleteBy :: (a -> a -> Bool) -> a -> StreamK m a -> StreamK m a
deleteBy eq x = go
where
go m1 = mkStream $ \st yld sng stp ->
@ -905,7 +905,7 @@ deleteBy eq x = go
-------------------------------------------------------------------------------
{-# INLINE mapMaybe #-}
mapMaybe :: (a -> Maybe b) -> Stream m a -> Stream m b
mapMaybe :: (a -> Maybe b) -> StreamK m a -> StreamK m b
mapMaybe f = go
where
go m1 = mkStream $ \st yld sng stp ->
@ -923,7 +923,7 @@ mapMaybe f = go
--
-- @since 0.1.0
{-# INLINE zipWith #-}
zipWith :: Monad m => (a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipWith :: Monad m => (a -> b -> c) -> StreamK m a -> StreamK m b -> StreamK m c
zipWith f = zipWithM (\a b -> return (f a b))
-- | Zip two streams serially using a monadic zipping function.
@ -931,7 +931,7 @@ zipWith f = zipWithM (\a b -> return (f a b))
-- @since 0.1.0
{-# INLINE zipWithM #-}
zipWithM :: Monad m =>
(a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
(a -> b -> m c) -> StreamK m a -> StreamK m b -> StreamK m c
zipWithM f = go
where
@ -951,7 +951,7 @@ zipWithM f = go
{-# INLINE mergeByM #-}
mergeByM :: Monad m =>
(a -> a -> m Ordering) -> Stream m a -> Stream m a -> Stream m a
(a -> a -> m Ordering) -> StreamK m a -> StreamK m a -> StreamK m a
mergeByM cmp = go
where
@ -1019,7 +1019,7 @@ mergeByM cmp = go
in foldStream st yield single stop mx
{-# INLINE mergeBy #-}
mergeBy :: (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
mergeBy :: (a -> a -> Ordering) -> StreamK m a -> StreamK m a -> StreamK m a
-- XXX GHC: This has slightly worse performance than replacing "r <- cmp x y"
-- with "let r = cmp x y" in the monadic version. The definition below is
-- exactly the same as mergeByM except this change.
@ -1087,7 +1087,7 @@ mergeBy cmp = go
------------------------------------------------------------------------------
{-# INLINE the #-}
the :: (Eq a, Monad m) => Stream m a -> m (Maybe a)
the :: (Eq a, Monad m) => StreamK m a -> m (Maybe a)
the m = do
r <- uncons m
case r of
@ -1105,7 +1105,7 @@ the m = do
-- Alternative & MonadPlus
------------------------------------------------------------------------------
_alt :: Stream m a -> Stream m a -> Stream m a
_alt :: StreamK m a -> StreamK m a -> StreamK m a
_alt m1 m2 = mkStream $ \st yld sng stp ->
let stop = foldStream st yld sng stp m2
in foldStream st yld sng stop m1
@ -1118,7 +1118,7 @@ _alt m1 m2 = mkStream $ \st yld sng stp ->
-- XXX handle and test cross thread state transfer
withCatchError
:: MonadError e m
=> Stream m a -> (e -> Stream m a) -> Stream m a
=> StreamK m a -> (e -> StreamK m a) -> StreamK m a
withCatchError m h =
mkStream $ \_ stp sng yld ->
let run x = unStream x Nothing stp sng yieldk
@ -1150,8 +1150,8 @@ splitAt n ls
parseDBreak
:: Monad m
=> PR.Parser a m b
-> Stream m a
-> m (Either ParseError b, Stream m a)
-> StreamK m a
-> m (Either ParseError b, StreamK m a)
parseDBreak (PR.Parser pstep initial extract) stream = do
res <- initial
case res of
@ -1238,7 +1238,7 @@ parseDBreak (PR.Parser pstep initial extract) stream = do
-- and convert ParserD to ParserK for element parsing using StreamK.
{-# INLINE parseD #-}
parseD :: Monad m =>
Parser.Parser a m b -> Stream m a -> m (Either ParseError b)
Parser.Parser a m b -> StreamK m a -> m (Either ParseError b)
parseD f = fmap fst . parseDBreak f
-------------------------------------------------------------------------------
@ -1373,7 +1373,7 @@ parseBreakChunks parser input = do
{-# INLINE parseChunks #-}
parseChunks :: (Monad m, Unbox a) =>
ParserK.Parser a m b -> Stream m (Array a) -> m (Either ParseError b)
ParserK.Parser a m b -> StreamK m (Array a) -> m (Either ParseError b)
parseChunks f = fmap fst . parseBreakChunks f
-------------------------------------------------------------------------------
@ -1393,7 +1393,7 @@ parseChunks f = fmap fst . parseBreakChunks f
-- /O(n) space/
--
{-# INLINE sortBy #-}
sortBy :: Monad m => (a -> a -> Ordering) -> Stream m a -> Stream m a
sortBy :: Monad m => (a -> a -> Ordering) -> StreamK m a -> StreamK m a
-- sortBy f = Stream.concatPairsWith (Stream.mergeBy f) Stream.fromPure
sortBy cmp =
let p =

View File

@ -19,14 +19,14 @@ where
import Control.Monad.Trans.Class (MonadTrans(lift))
import Control.Monad.Trans.State.Strict (StateT)
import Streamly.Internal.Data.Stream.StreamK
(Stream, nil, cons, uncons, concatEffect)
(StreamK, nil, cons, uncons, concatEffect)
import qualified Control.Monad.Trans.State.Strict as State
-- | Lazy left fold to an arbitrary transformer monad.
{-# INLINE foldlT #-}
foldlT :: (Monad m, Monad (s m), MonadTrans s)
=> (s m b -> a -> s m b) -> s m b -> Stream m a -> s m b
=> (s m b -> a -> s m b) -> s m b -> StreamK m a -> s m b
foldlT step = go
where
go acc m1 = do
@ -38,7 +38,7 @@ foldlT step = go
-- | Right associative fold to an arbitrary transformer monad.
{-# INLINE foldrT #-}
foldrT :: (Monad m, Monad (s m), MonadTrans s)
=> (a -> s m b -> s m b) -> s m b -> Stream m a -> s m b
=> (a -> s m b -> s m b) -> s m b -> StreamK m a -> s m b
foldrT step final = go
where
go m1 = do
@ -52,7 +52,7 @@ foldrT step final = go
------------------------------------------------------------------------------
{-# INLINE evalStateT #-}
evalStateT :: Monad m => m s -> Stream (StateT s m) a -> Stream m a
evalStateT :: Monad m => m s -> StreamK (StateT s m) a -> StreamK m a
evalStateT = go
where
@ -66,7 +66,7 @@ evalStateT = go
{-# INLINE liftInner #-}
liftInner :: (Monad m, MonadTrans t, Monad (t m)) =>
Stream m a -> Stream (t m) a
StreamK m a -> StreamK (t m) a
liftInner = go
where

File diff suppressed because it is too large Load Diff

View File

@ -79,7 +79,7 @@ import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
--
-- >>> (<>) = Stream.append
--
newtype StreamK m a = StreamK (K.Stream m a)
newtype StreamK m a = StreamK (K.StreamK m a)
-- XXX when deriving do we inherit an INLINE?
deriving (Semigroup, Monoid)
@ -90,11 +90,11 @@ type Stream = StreamK
------------------------------------------------------------------------------
{-# INLINE_EARLY fromStreamK #-}
fromStreamK :: K.Stream m a -> Stream m a
fromStreamK :: K.StreamK m a -> Stream m a
fromStreamK = StreamK
{-# INLINE_EARLY toStreamK #-}
toStreamK :: Stream m a -> K.Stream m a
toStreamK :: Stream m a -> K.StreamK m a
toStreamK (StreamK k) = k
{-# INLINE_EARLY fromStreamD #-}

View File

@ -591,7 +591,7 @@ fromStreamD = Unfold step pure
Stop -> Stop) <$> step1 defState state1
{-# INLINE_NORMAL fromStreamK #-}
fromStreamK :: Applicative m => Unfold m (K.Stream m a) a
fromStreamK :: Applicative m => Unfold m (K.StreamK m a) a
fromStreamK = Unfold step pure
where

View File

@ -16,7 +16,7 @@
-- Then, fromStreamK/toStreamK are inlined in the last phase:
--
-- {-# INLINE_LATE toStreamK #-}
-- toStreamK :: Monad m => Stream m a -> K.Stream m a```
-- toStreamK :: Monad m => Stream m a -> K.StreamK m a```
--
-- The fallback rules make sure that if we could not fuse the direct style
-- operations then better use the CPS style operation, because unfused direct

View File

@ -815,10 +815,10 @@ cleanupSVar workerSet = do
-- Used for concurrent evaluation of streams using a Channel.
{-# INLINE concatMapDivK #-}
concatMapDivK :: Monad m =>
(K.Stream m a -> m ())
-> (a -> K.Stream m b)
-> K.Stream m a
-> K.Stream m b
(K.StreamK m a -> m ())
-> (a -> K.StreamK m b)
-> K.StreamK m a
-> K.StreamK m b
concatMapDivK useTail useHead stream =
K.mkStream $ \st yld sng stp -> do
let foldShared = K.foldStreamShared st yld sng stp

View File

@ -189,9 +189,9 @@ parEval modifier input = withChannel modifier input (const id)
_appendGeneric :: MonadAsync m =>
((Config -> Config) -> m (Channel m a))
-> (Config -> Config)
-> K.Stream m a
-> K.Stream m a
-> K.Stream m a
-> K.StreamK m a
-> K.StreamK m a
-> K.StreamK m a
_appendGeneric newChan modifier stream1 stream2 = K.concatEffect action
where
@ -216,7 +216,7 @@ _appendGeneric newChan modifier stream1 stream2 = K.concatEffect action
-- The output stream is the result of the evaluation.
{-# INLINE appendWithK #-}
appendWithK :: MonadAsync m =>
(Config -> Config) -> K.Stream m a -> K.Stream m a -> K.Stream m a
(Config -> Config) -> K.StreamK m a -> K.StreamK m a -> K.StreamK m a
appendWithK modifier stream1 stream2 =
{-
if getOrdered (modifier defaultConfig)
@ -236,7 +236,7 @@ appendWithK modifier stream1 stream2 =
--
{-# INLINE _appendWithChanK #-}
_appendWithChanK :: MonadAsync m =>
Channel m a -> K.Stream m a -> K.Stream m a -> K.Stream m a
Channel m a -> K.StreamK m a -> K.StreamK m a -> K.StreamK m a
_appendWithChanK chan stream1 stream2 =
K.before (toChannelK chan stream2) stream1
@ -284,8 +284,8 @@ parTwo modifier stream1 stream2 =
{-# INLINE mkEnqueue #-}
mkEnqueue :: MonadAsync m =>
Channel m b
-> ((K.Stream m a -> m ()) -> K.Stream m a -> K.Stream m b)
-> m (K.Stream m a -> m ())
-> ((K.StreamK m a -> m ()) -> K.StreamK m a -> K.StreamK m b)
-> m (K.StreamK m a -> m ())
mkEnqueue chan runner = do
runInIO <- askRunInIO
return
@ -308,7 +308,7 @@ mkEnqueue chan runner = do
-- concurrently map and evaluate a stream.
{-# INLINE parConcatMapChanK #-}
parConcatMapChanK :: MonadAsync m =>
Channel m b -> (a -> K.Stream m b) -> K.Stream m a -> K.Stream m b
Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapChanK chan f stream =
let run q = concatMapDivK q f
in K.concatMapEffect (`run` stream) (mkEnqueue chan run)
@ -316,7 +316,7 @@ parConcatMapChanK chan f stream =
{-# INLINE parConcatMapChanKAny #-}
parConcatMapChanKAny :: MonadAsync m =>
Channel m b -> (a -> K.Stream m b) -> K.Stream m a -> K.Stream m b
Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapChanKAny chan f stream =
let done = K.nilM (stopChannel chan)
run q = concatMapDivK q (\x -> K.append (f x) done)
@ -324,7 +324,7 @@ parConcatMapChanKAny chan f stream =
{-# INLINE parConcatMapChanKFirst #-}
parConcatMapChanKFirst :: MonadAsync m =>
Channel m b -> (a -> K.Stream m b) -> K.Stream m a -> K.Stream m b
Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapChanKFirst chan f stream =
let done = K.nilM (stopChannel chan)
run q = concatMapDivK q f
@ -341,9 +341,9 @@ parConcatMapChanKFirst chan f stream =
parConcatMapChanKGeneric :: MonadAsync m =>
(Config -> Config)
-> Channel m b
-> (a -> K.Stream m b)
-> K.Stream m a
-> K.Stream m b
-> (a -> K.StreamK m b)
-> K.StreamK m a
-> K.StreamK m b
parConcatMapChanKGeneric modifier chan f stream = do
let cfg = modifier defaultConfig
case getStopWhen cfg of
@ -359,7 +359,7 @@ parConcatMapChanKGeneric modifier chan f stream = do
--
{-# INLINE parConcatMapK #-}
parConcatMapK :: MonadAsync m =>
(Config -> Config) -> (a -> K.Stream m b) -> K.Stream m a -> K.Stream m b
(Config -> Config) -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapK modifier f input =
let g = parConcatMapChanKGeneric modifier
in withChannelK modifier input (`g` f)

View File

@ -81,9 +81,9 @@ newChannel modifier =
{-# INLINE withChannelK #-}
withChannelK :: MonadAsync m =>
(Config -> Config)
-> K.Stream m a
-> (Channel m b -> K.Stream m a -> K.Stream m b)
-> K.Stream m b
-> K.StreamK m a
-> (Channel m b -> K.StreamK m a -> K.StreamK m b)
-> K.StreamK m b
withChannelK modifier input evaluator = K.concatEffect action
where

View File

@ -55,9 +55,9 @@ import Streamly.Internal.Data.Stream.Channel.Worker
{-# INLINE enqueueLIFO #-}
enqueueLIFO ::
Channel m a
-> IORef ([(RunInIO m, K.Stream m a)], [(RunInIO m, K.Stream m a)])
-> IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)])
-> Bool
-> (RunInIO m, K.Stream m a)
-> (RunInIO m, K.StreamK m a)
-> IO ()
enqueueLIFO sv q inner m = do
atomicModifyIORefCAS_ q $ \(xs, ys) ->
@ -68,8 +68,8 @@ data QResult a = QEmpty | QOuter a | QInner a
{-# INLINE dequeue #-}
dequeue :: MonadIO m =>
IORef ([(RunInIO m, K.Stream m a)], [(RunInIO m, K.Stream m a)])
-> m (QResult (RunInIO m, K.Stream m a))
IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)])
-> m (QResult (RunInIO m, K.StreamK m a))
dequeue qref =
liftIO
$ atomicModifyIORefCAS qref
@ -83,7 +83,7 @@ data WorkerStatus = Continue | Suspend
{-# INLINE workLoopLIFO #-}
workLoopLIFO
:: MonadRunInIO m
=> IORef ([(RunInIO m, K.Stream m a)], [(RunInIO m, K.Stream m a)])
=> IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)])
-> Channel m a
-> Maybe WorkerInfo
-> m ()
@ -144,7 +144,7 @@ workLoopLIFO qref sv winfo = run
{-# INLINE workLoopLIFOLimited #-}
workLoopLIFOLimited
:: forall m a. MonadRunInIO m
=> IORef ([(RunInIO m, K.Stream m a)], [(RunInIO m, K.Stream m a)])
=> IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)])
-> Channel m a
-> Maybe WorkerInfo
-> m ()
@ -234,8 +234,8 @@ workLoopLIFOLimited qref sv winfo = run
{-# INLINE enqueueAhead #-}
enqueueAhead ::
Channel m a
-> IORef ([K.Stream m a], Int)
-> (RunInIO m, K.Stream m a)
-> IORef ([K.StreamK m a], Int)
-> (RunInIO m, K.StreamK m a)
-> IO ()
enqueueAhead sv q m = do
-- XXX The queue is LIFO. When parConcatIterate queues more than one items
@ -444,7 +444,7 @@ updateHeapSeq hpVar seqNo =
{-# INLINE underMaxHeap #-}
underMaxHeap ::
Channel m a
-> Heap (Entry Int (AheadHeapEntry K.Stream m a))
-> Heap (Entry Int (AheadHeapEntry K.StreamK m a))
-> IO Bool
underMaxHeap sv hp = do
(_, len) <- readIORef (outputQueue sv)
@ -466,7 +466,7 @@ underMaxHeap sv hp = do
-- False => continue
preStopCheck ::
Channel m a
-> IORef (Heap (Entry Int (AheadHeapEntry K.Stream m a)) , Maybe Int)
-> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)) , Maybe Int)
-> IO Bool
preStopCheck sv heap =
-- check the stop condition under a lock before actually
@ -518,11 +518,11 @@ abortExecution sv winfo = do
--
processHeap
:: MonadRunInIO m
=> IORef ([K.Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry K.Stream m a)), Maybe Int)
=> IORef ([K.StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> AheadHeapEntry K.Stream m a
-> AheadHeapEntry K.StreamK m a
-> Int
-> Bool -- we are draining the heap before we stop
-> m ()
@ -629,8 +629,8 @@ processHeap q heap sv winfo entry sno stopping = loopHeap sno entry
{-# NOINLINE drainHeap #-}
drainHeap
:: MonadRunInIO m
=> IORef ([K.Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry K.Stream m a)), Maybe Int)
=> IORef ([K.StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
@ -645,11 +645,11 @@ data HeapStatus = HContinue | HStop
processWithoutToken
:: MonadRunInIO m
=> IORef ([K.Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry K.Stream m a)), Maybe Int)
=> IORef ([K.StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> K.Stream m a
-> K.StreamK m a
-> Int
-> m ()
processWithoutToken q heap sv winfo m seqNo = do
@ -725,11 +725,11 @@ data TokenWorkerStatus = TokenContinue Int | TokenSuspend
processWithToken
:: MonadRunInIO m
=> IORef ([K.Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry K.Stream m a)), Maybe Int)
=> IORef ([K.StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> K.Stream m a
-> K.StreamK m a
-> Int
-> m ()
processWithToken q heap sv winfo action sno = do
@ -834,8 +834,8 @@ processWithToken q heap sv winfo action sno = do
workLoopAhead
:: MonadRunInIO m
=> IORef ([K.Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry K.Stream m a)), Maybe Int)
=> IORef ([K.StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
@ -897,8 +897,8 @@ getLifoSVar mrun cfg = do
wfw <- newIORef False
running <- newIORef Set.empty
q <- newIORef
( [] :: [(RunInIO m, K.Stream m a)]
, [] :: [(RunInIO m, K.Stream m a)]
( [] :: [(RunInIO m, K.StreamK m a)]
, [] :: [(RunInIO m, K.StreamK m a)]
)
-- Sequence number is incremented whenever something is de-queued,
-- therefore, first sequence number would be 0
@ -936,7 +936,7 @@ getLifoSVar mrun cfg = do
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (IORef ([(RunInIO m, K.Stream m a)], [(RunInIO m, K.Stream m a)])
-> (IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)])
-> Channel m a
-> Maybe WorkerInfo
-> m())

View File

@ -44,8 +44,8 @@ data WorkerStatus = Continue | Suspend
{-# INLINE enqueueFIFO #-}
enqueueFIFO ::
Channel m a
-> LinkedQueue (RunInIO m, K.Stream m a)
-> (RunInIO m, K.Stream m a)
-> LinkedQueue (RunInIO m, K.StreamK m a)
-> (RunInIO m, K.StreamK m a)
-> IO ()
enqueueFIFO sv q m = do
pushL q m
@ -54,7 +54,7 @@ enqueueFIFO sv q m = do
{-# INLINE workLoopFIFO #-}
workLoopFIFO
:: MonadRunInIO m
=> LinkedQueue (RunInIO m, K.Stream m a)
=> LinkedQueue (RunInIO m, K.StreamK m a)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
@ -93,7 +93,7 @@ workLoopFIFO q sv winfo = run
{-# INLINE workLoopFIFOLimited #-}
workLoopFIFOLimited
:: forall m a. MonadRunInIO m
=> LinkedQueue (RunInIO m, K.Stream m a)
=> LinkedQueue (RunInIO m, K.StreamK m a)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
@ -180,7 +180,7 @@ getFifoSVar mrun cfg = do
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (LinkedQueue (RunInIO m, K.Stream m a)
-> (LinkedQueue (RunInIO m, K.StreamK m a)
-> Channel m a
-> Maybe WorkerInfo
-> m())

View File

@ -89,7 +89,7 @@ import Test.Inspection (inspect, hasNoTypeClassesExcept)
-- | Write a stream to an 'SVar' in a non-blocking manner. The stream can then
-- be read back from the SVar using 'fromSVar'.
{-# INLINE toChannelK #-}
toChannelK :: MonadRunInIO m => Channel m a -> K.Stream m a -> m ()
toChannelK :: MonadRunInIO m => Channel m a -> K.StreamK m a -> m ()
toChannelK sv m = do
runIn <- askRunInIO
liftIO $ enqueue sv False (runIn, m)
@ -114,7 +114,7 @@ joinChannel = undefined
-- | Pull a stream from an SVar.
{-# NOINLINE fromChannelRaw #-}
fromChannelRaw :: (MonadIO m, MonadThrow m) => Channel m a -> K.Stream m a
fromChannelRaw :: (MonadIO m, MonadThrow m) => Channel m a -> K.StreamK m a
fromChannelRaw sv = K.MkStream $ \st yld sng stp -> do
list <- readOutputQ sv
-- Reversing the output is important to guarantee that we process the
@ -185,7 +185,7 @@ inspect $ hasNoTypeClassesExcept 'fromChannelRaw
-- it to multiple consumers. or should we use an explicit dupChannel for that?
{-# INLINE fromChannelK #-}
fromChannelK :: MonadAsync m => Channel m a -> K.Stream m a
fromChannelK :: MonadAsync m => Channel m a -> K.StreamK m a
fromChannelK sv =
K.mkStream $ \st yld sng stp -> do
ref <- liftIO $ newIORef ()

View File

@ -28,7 +28,7 @@ import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS_)
import Streamly.Internal.Data.Stream.Channel.Dispatcher (dumpSVarStats)
import Streamly.Internal.Data.Stream.Channel.Worker
(sendYield, sendStop, sendWithDoorBell)
import Streamly.Internal.Data.Stream.StreamK.Type (Stream)
import Streamly.Internal.Data.Stream.StreamK.Type (StreamK)
import Streamly.Internal.Data.Stream.Channel.Types
@ -88,7 +88,7 @@ data Channel m a = Channel
, remainingWork :: Maybe (IORef Count)
, yieldRateInfo :: Maybe YieldRateInfo
, enqueue :: Bool -> (RunInIO m, Stream m a) -> IO ()
, enqueue :: Bool -> (RunInIO m, StreamK m a) -> IO ()
, eagerDispatch :: m ()
, isWorkDone :: IO Bool
, isQueueDone :: IO Bool

View File

@ -15,7 +15,7 @@ module Streamly.Internal.Data.Stream.IsStream.Type {-# DEPRECATED "Please use \"
(
-- * IsStream Type Class
IsStream (..)
, K.Stream (..)
, K.StreamK (..)
-- * Type Conversion
, fromStreamD
@ -124,7 +124,7 @@ import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
, drain, eqBy, cmpBy, fromList, toList, foldrMx, foldlMx'
, foldlx', foldl', fold)
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
(Stream(..), cons, fromEffect
(StreamK(..), cons, fromEffect
, nil, fromPure, bindWith, drain
, fromFoldable, nilM, repeat)
import qualified Streamly.Internal.Data.Stream.StreamK as StreamK
@ -160,8 +160,8 @@ class
, forall m. MonadAsync m => Applicative (t m)
) =>
IsStream t where
toStream :: t m a -> K.Stream m a
fromStream :: K.Stream m a -> t m a
toStream :: t m a -> K.StreamK m a
fromStream :: K.StreamK m a -> t m a
-- | Constructs a stream by adding a monadic action at the head of an
-- existing stream. For example:
--
@ -231,7 +231,7 @@ fromStreamD = fromStream . D.toStreamK
-- | Adapt a polymorphic consM operation to a StreamK cons operation
{-# INLINE toConsK #-}
toConsK :: IsStream t =>
(m a -> t m a -> t m a) -> m a -> K.Stream m a -> K.Stream m a
(m a -> t m a -> t m a) -> m a -> K.StreamK m a -> K.StreamK m a
toConsK cns x xs = toStream $ x `cns` fromStream xs
------------------------------------------------------------------------------
@ -317,7 +317,7 @@ toList m = D.toList $ toStreamD m
-- continuation and a yield continuation.
{-# INLINE_EARLY mkStream #-}
mkStream :: IsStream t
=> (forall r. State K.Stream m a
=> (forall r. State K.StreamK m a
-> (a -> t m a -> m r)
-> (a -> m r)
-> m r
@ -329,8 +329,8 @@ mkStream k = fromStream $ K.MkStream $ \st yld sng stp ->
{-# RULES "mkStream from stream" mkStream = mkStreamFromStream #-}
mkStreamFromStream :: IsStream t
=> (forall r. State K.Stream m a
-> (a -> K.Stream m a -> m r)
=> (forall r. State K.StreamK m a
-> (a -> K.StreamK m a -> m r)
-> (a -> m r)
-> m r
-> m r)
@ -339,12 +339,12 @@ mkStreamFromStream k = fromStream $ K.MkStream k
{-# RULES "mkStream stream" mkStream = mkStreamStream #-}
mkStreamStream
:: (forall r. State K.Stream m a
-> (a -> K.Stream m a -> m r)
:: (forall r. State K.StreamK m a
-> (a -> K.StreamK m a -> m r)
-> (a -> m r)
-> m r
-> m r)
-> K.Stream m a
-> K.StreamK m a
mkStreamStream = K.MkStream
------------------------------------------------------------------------------
@ -399,7 +399,7 @@ fold fld m = D.fold fld $ toStreamD m
{-# INLINE_EARLY foldStreamShared #-}
foldStreamShared
:: IsStream t
=> State K.Stream m a
=> State K.StreamK m a
-> (a -> t m a -> m r)
-> (a -> m r)
-> m r
@ -414,11 +414,11 @@ foldStreamShared st yld sng stp m =
{-# RULES "foldStreamShared from stream"
foldStreamShared = foldStreamSharedStream #-}
foldStreamSharedStream
:: State K.Stream m a
-> (a -> K.Stream m a -> m r)
:: State K.StreamK m a
-> (a -> K.StreamK m a -> m r)
-> (a -> m r)
-> m r
-> K.Stream m a
-> K.StreamK m a
-> m r
foldStreamSharedStream st yld sng stp m =
let K.MkStream k = m
@ -430,7 +430,7 @@ foldStreamSharedStream st yld sng stp m =
{-# INLINE foldStream #-}
foldStream
:: IsStream t
=> State K.Stream m a
=> State K.StreamK m a
-> (a -> t m a -> m r)
-> (a -> m r)
-> m r

View File

@ -65,7 +65,7 @@ import Streamly.Internal.Data.Stream.StreamD.Type (Step(..))
import qualified Data.Set as Set
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
(Stream, foldStreamShared, mkStream, foldStream, fromEffect
(StreamK, foldStreamShared, mkStream, foldStream, fromEffect
, nil, concatMapWith, fromPure, bindWith)
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
(Stream(..), mapM, toStreamK, fromStreamK)
@ -91,7 +91,7 @@ import Prelude hiding (map)
-- :}
{-# INLINABLE withLocal #-}
withLocal :: MonadReader r m => (r -> r) -> K.Stream m a -> K.Stream m a
withLocal :: MonadReader r m => (r -> r) -> K.StreamK m a -> K.StreamK m a
withLocal f m =
K.mkStream $ \st yld sng stp ->
let single = local f . sng
@ -109,7 +109,7 @@ withLocal f m =
{-# NOINLINE runOne #-}
runOne
:: MonadIO m
=> State K.Stream m a -> K.Stream m a -> Maybe WorkerInfo -> m ()
=> State K.StreamK m a -> K.StreamK m a -> Maybe WorkerInfo -> m ()
runOne st m0 winfo =
case getYieldLimit st of
Nothing -> go m0
@ -132,7 +132,7 @@ runOne st m0 winfo =
runOneLimited
:: MonadIO m
=> State K.Stream m a -> K.Stream m a -> Maybe WorkerInfo -> m ()
=> State K.StreamK m a -> K.StreamK m a -> Maybe WorkerInfo -> m ()
runOneLimited st m0 winfo = go m0
where
@ -169,7 +169,7 @@ runOneLimited st m0 winfo = go m0
{-# NOINLINE forkSVarPar #-}
forkSVarPar :: MonadAsync m
=> SVarStopStyle -> K.Stream m a -> K.Stream m a -> K.Stream m a
=> SVarStopStyle -> K.StreamK m a -> K.StreamK m a -> K.StreamK m a
forkSVarPar ss m r = K.mkStream $ \st yld sng stp -> do
sv <- newParallelVar ss st
pushWorkerPar sv (runOne st{streamVar = Just sv} m)
@ -186,9 +186,9 @@ joinStreamVarPar ::
MonadAsync m
=> SVarStyle
-> SVarStopStyle
-> K.Stream m a
-> K.Stream m a
-> K.Stream m a
-> K.StreamK m a
-> K.StreamK m a
-> K.StreamK m a
joinStreamVarPar style ss m1 m2 = K.mkStream $ \st yld sng stp ->
case streamVar st of
Just sv | svarStyle sv == style && svarStopStyle sv == ss -> do
@ -238,7 +238,7 @@ joinStreamVarPar style ss m1 m2 = K.mkStream $ \st yld sng stp ->
-------------------------------------------------------------------------------
{-# INLINE parallelK #-}
parallelK :: MonadAsync m => K.Stream m a -> K.Stream m a -> K.Stream m a
parallelK :: MonadAsync m => K.StreamK m a -> K.StreamK m a -> K.StreamK m a
parallelK = joinStreamVarPar ParallelVar StopNone
-- | XXX we can implement it more efficienty by directly implementing instead
@ -256,7 +256,7 @@ consM m (ParallelT r) = ParallelT $ parallelK (K.fromEffect m) r
--
-- /Pre-release/
{-# INLINE parallelFstK #-}
parallelFstK :: MonadAsync m => K.Stream m a -> K.Stream m a -> K.Stream m a
parallelFstK :: MonadAsync m => K.StreamK m a -> K.StreamK m a -> K.StreamK m a
parallelFstK = joinStreamVarPar ParallelVar StopBy
-- This is a race like combinator for streams.
@ -266,7 +266,7 @@ parallelFstK = joinStreamVarPar ParallelVar StopBy
--
-- /Pre-release/
{-# INLINE parallelMinK #-}
parallelMinK :: MonadAsync m => K.Stream m a -> K.Stream m a -> K.Stream m a
parallelMinK :: MonadAsync m => K.StreamK m a -> K.StreamK m a -> K.StreamK m a
parallelMinK = joinStreamVarPar ParallelVar StopAny
------------------------------------------------------------------------------
@ -277,7 +277,7 @@ parallelMinK = joinStreamVarPar ParallelVar StopAny
--
-- /Pre-release/
--
mkParallelK :: MonadAsync m => K.Stream m a -> K.Stream m a
mkParallelK :: MonadAsync m => K.StreamK m a -> K.StreamK m a
mkParallelK m = K.mkStream $ \st yld sng stp -> do
sv <- newParallelVar StopNone (adaptState st)
-- pushWorkerPar sv (runOne st{streamVar = Just sv} $ toStream m)
@ -327,7 +327,7 @@ mkParallelD m = D.Stream step Nothing
-- is determined by the prevailing 'Streamly.Prelude.maxBuffer' setting.
--
-- @
-- Stream m a -> m b
-- StreamK m a -> m b
-- |
-- -----stream m a ---------------stream m a-----
--
@ -351,7 +351,7 @@ mkParallelD m = D.Stream step Nothing
-- /Pre-release/
{-# INLINE tapAsyncK #-}
tapAsyncK ::
MonadAsync m => (K.Stream m a -> m b) -> K.Stream m a -> K.Stream m a
MonadAsync m => (K.StreamK m a -> m b) -> K.StreamK m a -> K.StreamK m a
tapAsyncK f m = K.mkStream $ \st yld sng stp -> do
sv <- SVar.newFoldSVar st (f . Stream.toStreamK)
K.foldStreamShared st yld sng stp
@ -429,7 +429,7 @@ tapAsyncF f (D.Stream step1 state1) = D.Stream step TapInit
-- /Since: 0.7.0 (maxBuffer applies to ParallelT streams)/
--
-- @since 0.8.0
newtype ParallelT m a = ParallelT {getParallelT :: K.Stream m a}
newtype ParallelT m a = ParallelT {getParallelT :: K.StreamK m a}
instance MonadTrans ParallelT where
{-# INLINE lift #-}
@ -519,7 +519,7 @@ MONAD_COMMON_INSTANCES(ParallelT, MONADPARALLEL)
-- /Pre-release/
--
{-# INLINE_NORMAL newCallbackStream #-}
newCallbackStream :: MonadAsync m => m (a -> m (), K.Stream m a)
newCallbackStream :: MonadAsync m => m (a -> m (), K.StreamK m a)
newCallbackStream = do
sv <- newParallelVar StopNone defState

View File

@ -43,7 +43,7 @@ import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
(Stream(..), Step(..), fold)
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
(Stream(..), mkStream, foldStream, foldStreamShared, nilM)
(Stream, mkStream, foldStream, foldStreamShared, nilM)
import qualified Streamly.Internal.Data.Stream.Serial as Stream
(fromStreamK, toStreamK)

View File

@ -46,7 +46,7 @@ import System.Mem (performMajorGC)
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
(Stream(..), Step(..))
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
(Stream(..), foldStreamShared, mkStream, foldStream)
(Stream, foldStreamShared, mkStream, foldStream)
import qualified Streamly.Internal.Data.Stream.Serial as Stream (fromStreamK)
import Streamly.Internal.Data.SVar
@ -113,7 +113,7 @@ toSVar sv m = do
-- | Pull a stream from an SVar.
{-# NOINLINE fromStreamVar #-}
fromStreamVar :: MonadAsync m => SVar K.Stream m a -> K.Stream m a
fromStreamVar sv = K.MkStream $ \st yld sng stp -> do
fromStreamVar sv = K.mkStream $ \st yld sng stp -> do
list <- readOutputQ sv
-- Reversing the output is important to guarantee that we process the
-- outputs in the same order as they were generated by the constituent
@ -130,13 +130,13 @@ fromStreamVar sv = K.MkStream $ \st yld sng stp -> do
stp
{-# INLINE processEvents #-}
processEvents [] = K.MkStream $ \st yld sng stp -> do
processEvents [] = K.mkStream $ \st yld sng stp -> do
done <- postProcess sv
if done
then allDone stp
else K.foldStream st yld sng stp $ fromStreamVar sv
processEvents (ev : es) = K.MkStream $ \st yld sng stp -> do
processEvents (ev : es) = K.mkStream $ \st yld sng stp -> do
let rest = processEvents es
case ev of
ChildYield a -> yld a rest

View File

@ -1,5 +1,5 @@
{-# LANGUAGE UndecidableInstances #-}
{-# OPTIONS_GHC -Wno-orphans #-}
{-# OPTIONS_GHC -Wno-deprecations #-}
-- |
-- Module : Streamly.Internal.Data.Stream.Serial

View File

@ -16,7 +16,7 @@
-- Then, fromStreamK/toStreamK are inlined in the last phase:
--
-- {-# INLINE_LATE toStreamK #-}
-- toStreamK :: Monad m => Stream m a -> K.Stream m a```
-- toStreamK :: Monad m => Stream m a -> K.StreamK m a```
--
-- The fallback rules make sure that if we could not fuse the direct style
-- operations then better use the CPS style operation, because unfused direct