Move the ArrayStream splicing code to MutArray module

This commit is contained in:
Harendra Kumar 2023-12-27 04:59:27 +05:30
parent ec11426699
commit 6b3a8c4a47
2 changed files with 33 additions and 80 deletions

View File

@ -236,6 +236,15 @@ compact :: (MonadIO m, Unbox a)
=> Int -> Stream m (Array a) -> Stream m (Array a)
compact = packArraysChunksOf
-- | Given a stream of arrays, splice them all together to generate a single
-- array. The stream must be /finite/.
--
-- @since 0.7.0
{-# INLINE toArray #-}
toArray :: (MonadIO m, Unbox a) => Stream m (Array a) -> m (Array a)
toArray s =
fmap A.unsafeFreeze $ MA.fromArrayStreamRealloced (fmap A.unsafeThaw s)
-------------------------------------------------------------------------------
-- Split
-------------------------------------------------------------------------------
@ -439,7 +448,7 @@ foldBreak = foldBreakK
-- foldBreak f = runArrayFoldBreak (ChunkFold.fromFold f)
-------------------------------------------------------------------------------
-- Fold to a single Array
-- Elimination - running element parsers
-------------------------------------------------------------------------------
-- When we have to take an array partially, take the last part of the array.
@ -485,83 +494,6 @@ splitAtArrayListRev n ls
arr1 = Array contents end1 end
in ([arr1], arr2:xs)
-------------------------------------------------------------------------------
-- Fold to a single Array
-------------------------------------------------------------------------------
{-
-- XXX Both of these implementations of splicing seem to perform equally well.
-- We need to perform benchmarks over a range of sizes though.
-- CAUTION! length must more than equal to lengths of all the arrays in the
-- stream.
{-# INLINE spliceArraysLenUnsafe #-}
spliceArraysLenUnsafe :: (MonadIO m, Unbox a)
=> Int -> Stream m (MutArray a) -> m (MutArray a)
spliceArraysLenUnsafe len buffered = do
-- XXX The new array's pinned state should depend on the first element
-- of the stream. We should uncons the stream and do the required.
arr <- undefined
D.foldlM' MA.spliceUnsafe (return arr) buffered
{-# INLINE _spliceArrays #-}
_spliceArrays :: (MonadIO m, Unbox a)
=> Stream m (Array a) -> m (Array a)
_spliceArrays s = do
buffered <- D.foldr K.cons K.nil s
len <- K.fold FL.sum (fmap Array.length buffered)
-- XXX The new array's pinned state should depend on the first element
-- of the stream. We should uncons the stream and do the required.
arr <- undefined
final <- D.foldlM' writeArr (return arr) (toStream buffered)
return $ A.unsafeFreeze final
where
writeArr dst arr = MA.spliceUnsafe dst (A.unsafeThaw arr)
{-# INLINE _spliceArraysBuffered #-}
_spliceArraysBuffered :: (MonadIO m, Unbox a)
=> Stream m (Array a) -> m (Array a)
_spliceArraysBuffered s = do
buffered <- D.foldr K.cons K.nil s
len <- K.fold FL.sum (fmap Array.length buffered)
A.unsafeFreeze <$>
spliceArraysLenUnsafe len (fmap A.unsafeThaw (toStream buffered))
-}
{-# INLINE spliceArraysRealloced #-}
spliceArraysRealloced :: forall m a. (MonadIO m, Unbox a)
=> Stream m (Array a) -> m (Array a)
spliceArraysRealloced s = do
res <- D.uncons s
case res of
Just (a, strm) -> do
arr <-
D.foldlM'
MA.spliceExp
(pure (A.unsafeThaw a))
(fmap A.unsafeThaw strm)
liftIO $ A.unsafeFreeze <$> MA.rightSize arr
Nothing -> pure A.nil
-- XXX This should just be "fold A.write"
--
-- | Given a stream of arrays, splice them all together to generate a single
-- array. The stream must be /finite/.
--
-- @since 0.7.0
{-# INLINE toArray #-}
toArray :: (MonadIO m, Unbox a) => Stream m (Array a) -> m (Array a)
toArray = spliceArraysRealloced
-- spliceArrays = _spliceArraysBuffered
-------------------------------------------------------------------------------
-- Elimination - running element parsers
-------------------------------------------------------------------------------
-- GHC parser does not accept {-# ANN type [] NoSpecConstr #-}, so we need
-- to make a newtype.
{-# ANN type List NoSpecConstr #-}

View File

@ -78,6 +78,8 @@ module Streamly.Internal.Data.MutArray.Type
, fromPureStream
, fromByteStr#
, fromPtrN
, fromArrayStreamK
, fromArrayStreamRealloced
-- ** Random writes
, putIndex
@ -200,7 +202,6 @@ module Streamly.Internal.Data.MutArray.Type
-- *** Eliminate to streams
, flattenArrays
, flattenArraysRev
, fromArrayStreamK
-- *** Construct from arrays
-- get chunks without copying
@ -2090,7 +2091,27 @@ fromByteStr# addr = do
return (arr {arrEnd = lenInt})
-------------------------------------------------------------------------------
-- convert stream to a single array
-- convert a stream of arrays to a single array by reallocating and copying
-------------------------------------------------------------------------------
-- XXX Both of these implementations of splicing seem to perform equally well.
-- We need to perform benchmarks over a range of sizes though.
{-# INLINE fromArrayStreamRealloced #-}
fromArrayStreamRealloced :: forall m a. (MonadIO m, Unbox a)
=> Stream m (MutArray a) -> m (MutArray a)
fromArrayStreamRealloced s = do
res <- D.uncons s
case res of
Just (a, strm) -> do
arr <- D.foldlM' spliceExp (pure a) strm
-- Reallocation is exponential so there may be 50% empty space in
-- worst case. One more reallocation to reclaim the space.
liftIO $ rightSize arr
Nothing -> pure nil
-------------------------------------------------------------------------------
-- convert a stream of arrays to a single array by buffering arrays first
-------------------------------------------------------------------------------
{-# INLINE arrayStreamKLength #-}