Make changes for "reader" based impl of concatChunks

This commit is contained in:
Harendra Kumar 2023-12-31 17:02:51 +05:30
parent 2417874bed
commit ffea484db8
2 changed files with 56 additions and 58 deletions

View File

@ -23,11 +23,6 @@ module Streamly.Internal.Data.Array
-- Monadic Folds
, writeLastN
-- * Unfolds
, reader
, readerUnsafe
, producer -- experimental
-- * Random Access
-- , (!!)
, getIndex
@ -99,7 +94,6 @@ import Prelude hiding (length, null, last, map, (!!), read, concat)
import Streamly.Internal.Data.MutByteArray.Type (PinnedState(..))
import Streamly.Internal.Data.Serialize.Type (Serialize)
import Streamly.Internal.Data.Fold.Type (Fold(..))
import Streamly.Internal.Data.Producer.Type (Producer(..))
import Streamly.Internal.Data.Stream (Stream)
import Streamly.Internal.Data.Tuple.Strict (Tuple3Fused'(..))
import Streamly.Internal.Data.Unfold.Type (Unfold(..))
@ -110,8 +104,6 @@ import qualified Streamly.Internal.Data.MutByteArray.Type as MBA
import qualified Streamly.Internal.Data.MutArray as MA
import qualified Streamly.Internal.Data.Array.Type as A
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Internal.Data.Producer.Type as Producer
import qualified Streamly.Internal.Data.Producer as Producer
import qualified Streamly.Internal.Data.Ring as RB
import qualified Streamly.Internal.Data.Stream as D
import qualified Streamly.Internal.Data.Stream as Stream
@ -148,49 +140,6 @@ import Streamly.Internal.Data.Array.Type
-- Elimination
-------------------------------------------------------------------------------
{-# INLINE_NORMAL producer #-}
producer :: forall m a. (Monad m, Unbox a) => Producer m (Array a) a
producer =
Producer.translate A.unsafeThaw A.unsafeFreeze
$ MA.producerWith (return . unsafeInlineIO)
-- | Unfold an array into a stream.
--
{-# INLINE_NORMAL reader #-}
reader :: forall m a. (Monad m, Unbox a) => Unfold m (Array a) a
reader = Producer.simplify producer
-- | Unfold an array into a stream, does not check the end of the array, the
-- user is responsible for terminating the stream within the array bounds. For
-- high performance application where the end condition can be determined by
-- a terminating fold.
--
-- Written in the hope that it may be faster than "read", however, in the case
-- for which this was written, "read" proves to be faster even though the core
-- generated with unsafeRead looks simpler.
--
-- /Pre-release/
--
{-# INLINE_NORMAL readerUnsafe #-}
readerUnsafe :: forall m a. (Monad m, Unbox a) => Unfold m (Array a) a
readerUnsafe = Unfold step inject
where
inject (Array contents start end) =
return (ArrayUnsafe contents end start)
{-# INLINE_LATE step #-}
step (ArrayUnsafe contents end p) = do
-- unsafeInlineIO allows us to run this in Identity monad for pure
-- toList/foldr case which makes them much faster due to not
-- accumulating the list and fusing better with the pure consumers.
--
-- This should be safe as the array contents are guaranteed to be
-- evaluated/written to before we peek at them.
let !x = unsafeInlineIO $ peekAt p contents
let !p1 = INDEX_NEXT(p,a)
return $ D.Yield x (ArrayUnsafe contents end p1)
-- |
--
-- >>> null arr = Array.byteLength arr == 0

View File

@ -54,6 +54,11 @@ module Streamly.Internal.Data.Array.Type
, clone
, pinnedClone
-- * Unfolds
, reader
, readerUnsafe
, producer -- experimental
-- ** Elimination
, unsafeIndexIO
, getIndexUnsafe
@ -76,7 +81,7 @@ module Streamly.Internal.Data.Array.Type
, pinnedWriteN
, writeNUnsafe
, pinnedWriteNUnsafe
, MA.ArrayUnsafe (..)
-- , MA.ArrayUnsafe (..)
, pinnedWriteNAligned
, write
, pinnedWrite
@ -121,6 +126,7 @@ import GHC.Exts (IsList, IsString(..), Addr#)
import GHC.IO (unsafePerformIO)
import GHC.Ptr (Ptr(..))
import Streamly.Internal.Data.Producer.Type (Producer(..))
import Streamly.Internal.Data.MutArray.Type (MutArray(..))
import Streamly.Internal.Data.MutByteArray.Type (MutByteArray)
import Streamly.Internal.Data.Fold.Type (Fold(..))
@ -137,6 +143,7 @@ import qualified Streamly.Internal.Data.MutArray.Type as MA
import qualified Streamly.Internal.Data.Stream.Type as D
import qualified Streamly.Internal.Data.StreamK.Type as K
import qualified Streamly.Internal.Data.MutByteArray.Type as Unboxed
import qualified Streamly.Internal.Data.Producer as Producer
import qualified Streamly.Internal.Data.Unfold.Type as Unfold
import qualified Text.ParserCombinators.ReadPrec as ReadPrec
@ -414,16 +421,15 @@ pinnedChunksOf :: forall m a. (MonadIO m, Unbox a)
=> Int -> D.Stream m a -> D.Stream m (Array a)
pinnedChunksOf n str = D.map unsafeFreeze $ MA.pinnedChunksOf n str
-- | Use the "reader" unfold instead.
-- | Convert a stream of arrays into a stream of their elements.
--
-- @concatChunks = unfoldMany reader@
--
-- We can try this if there are any fusion issues in the unfold.
-- >>> concatChunks = Stream.unfoldMany Array.reader
--
{-# INLINE_NORMAL concatChunks #-}
concatChunks :: forall m a. (MonadIO m, Unbox a)
=> D.Stream m (Array a) -> D.Stream m a
concatChunks :: (MonadIO m, Unbox a) => Stream m (Array a) -> Stream m a
-- XXX this requires MonadIO whereas the unfoldMany version does not
concatChunks = MA.concatChunks . D.map unsafeThaw
-- concatChunks = D.unfoldMany reader
{-# DEPRECATED flattenArrays "Please use \"unfoldMany reader\" instead." #-}
flattenArrays :: forall m a. (MonadIO m, Unbox a)
@ -488,6 +494,49 @@ byteLength = MA.byteLength . unsafeThaw
length :: Unbox a => Array a -> Int
length arr = MA.length (unsafeThaw arr)
{-# INLINE_NORMAL producer #-}
producer :: forall m a. (Monad m, Unbox a) => Producer m (Array a) a
producer =
Producer.translate unsafeThaw unsafeFreeze
$ MA.producerWith (return . unsafeInlineIO)
-- | Unfold an array into a stream.
--
{-# INLINE_NORMAL reader #-}
reader :: forall m a. (Monad m, Unbox a) => Unfold m (Array a) a
reader = Producer.simplify producer
-- | Unfold an array into a stream, does not check the end of the array, the
-- user is responsible for terminating the stream within the array bounds. For
-- high performance application where the end condition can be determined by
-- a terminating fold.
--
-- Written in the hope that it may be faster than "read", however, in the case
-- for which this was written, "read" proves to be faster even though the core
-- generated with unsafeRead looks simpler.
--
-- /Pre-release/
--
{-# INLINE_NORMAL readerUnsafe #-}
readerUnsafe :: forall m a. (Monad m, Unbox a) => Unfold m (Array a) a
readerUnsafe = Unfold step inject
where
inject (Array contents start end) =
return (MA.ArrayUnsafe contents end start)
{-# INLINE_LATE step #-}
step (MA.ArrayUnsafe contents end p) = do
-- unsafeInlineIO allows us to run this in Identity monad for pure
-- toList/foldr case which makes them much faster due to not
-- accumulating the list and fusing better with the pure consumers.
--
-- This should be safe as the array contents are guaranteed to be
-- evaluated/written to before we peek at them.
let !x = unsafeInlineIO $ peekAt p contents
let !p1 = INDEX_NEXT(p,a)
return $ D.Yield x (MA.ArrayUnsafe contents end p1)
-- | Unfold an array into a stream in reverse order.
--
{-# INLINE_NORMAL readerRev #-}