Add Data.Array.Prim.Pinned and Data.Array.Prim

Respect the new module structure for Internal files
This commit is contained in:
adithyaov 2020-07-16 17:30:55 +05:30 committed by Harendra Kumar
parent 12c672ef1d
commit 4d0631a0c6
20 changed files with 2172 additions and 454 deletions

View File

@ -12,7 +12,9 @@ src/Streamly/Internal/Data/Stream/StreamD/Type.hs
src/Streamly/Internal/Data/Stream/StreamDK/Type.hs
src/Streamly/Internal/Data/Stream/StreamD.hs
src/Streamly/Internal/Data/Pipe/Types.hs
src/Streamly/Internal/Data/Prim/Array/Types.hs
src/Streamly/Internal/Data/SmallArray/Types.hs
src/Streamly/Internal/Data/Unicode/Stream.hs
src/Streamly/Internal/Mutable/Prim/Var.hs
src/Streamly/Internal/Data/Array/Prim/Types.hs
src/Streamly/Internal/Data/Array/Prim/MutTypesInclude.hs
src/Streamly/Internal/Data/Array/Prim/TypesInclude.hs

View File

@ -29,7 +29,12 @@ prelude_other_grp="\
Prelude.Concurrent \
Prelude.Adaptive"
array_grp="Memory.Array Data.Array Data.Prim.Array Data.SmallArray"
array_grp="\
Memory.Array \
Data.Array \
Data.Array.Prim \
Data.SmallArray \
Data.Array.Prim.Pinned"
base_parser_grp="Data.Parser.ParserD Data.Parser.ParserK"
parser_grp="Data.Fold Data.Parser"
@ -54,7 +59,8 @@ base_stream_cmp="Data.Stream.StreamD Data.Stream.StreamK"
serial_wserial_cmp="Prelude.Serial Prelude.WSerial"
serial_async_cmp="Prelude.Serial Prelude.Async"
concurrent_cmp="Prelude.Async Prelude.WAsync Prelude.Ahead Prelude.Parallel"
array_cmp="Memory.Array Data.Prim.Array Data.Array"
array_cmp="Memory.Array Data.Array.Prim Data.Array Data.Array.Prim.Pinned"
pinned_array_cmp="Memory.Array Data.Array.Prim.Pinned"
base_parser_cmp=$base_parser_grp
COMPARISONS="\
base_stream_cmp \
@ -62,6 +68,7 @@ COMPARISONS="\
serial_async_cmp \
concurrent_cmp \
array_cmp \
pinned_array_cmp \
base_parser_cmp"
#------------------------------------------------------------------------------

View File

@ -1,6 +1,6 @@
-- |
-- Module : Main
-- Copyright : (c) 2019 Composewell Technologies
-- Copyright : (c) 2020 Composewell Technologies
--
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
@ -8,25 +8,20 @@
{-# LANGUAGE CPP #-}
import Control.DeepSeq (NFData(..))
#ifndef DATA_PRIM_ARRAY
import Control.DeepSeq (deepseq)
#endif
import System.Random (randomRIO)
import qualified Streamly.Benchmark.Data.ArrayOps as Ops
import Streamly.Benchmark.Common hiding (benchPureSrc)
import Gauge
import Streamly.Benchmark.Common hiding (benchPureSrc)
#ifdef MEMORY_ARRAY
import qualified GHC.Exts as GHC
import Foreign.Storable (Storable(..))
#endif
#ifdef DATA_PRIM_ARRAY
import Data.Primitive.Types (Prim(..))
#if !defined(DATA_ARRAY_PRIM) && !defined(DATA_ARRAY_PRIM_PINNED)
import Control.DeepSeq (deepseq)
#endif
-------------------------------------------------------------------------------
@ -49,13 +44,9 @@ benchIO name src f = bench name $ nfIO $
-- Drain a source that generates an array in the IO monad
{-# INLINE benchIOSrc #-}
benchIOSrc ::
#ifndef DATA_PRIM_ARRAY
NFData a =>
#endif
#ifdef MEMORY_ARRAY
Storable a =>
#elif defined(DATA_PRIM_ARRAY)
Prim a =>
#endif
String -> (Int -> IO (Ops.Stream a)) -> Benchmark
benchIOSrc name src = benchIO name src id
@ -92,7 +83,7 @@ o_1_space_generation value =
"writeN . IsString.fromString"
(Ops.sourceIsString value)
#endif
#ifndef DATA_PRIM_ARRAY
#if !defined(DATA_ARRAY_PRIM) && !defined(DATA_ARRAY_PRIM_PINNED)
#ifdef DATA_SMALLARRAY
, let testStr =
"fromListN " ++
@ -122,7 +113,7 @@ o_1_space_elimination value =
, benchIOSink value "foldl'" Ops.pureFoldl'
, benchIOSink value "read" Ops.unfoldReadDrain
, benchIOSink value "toStreamRev" Ops.toStreamRevDrain
#ifndef DATA_PRIM_ARRAY
#if !defined(DATA_ARRAY_PRIM) && !defined(DATA_ARRAY_PRIM_PINNED)
#ifdef DEVBUILD
, benchPureSink value "foldable/foldl'" Ops.foldableFoldl'
, benchPureSink value "foldable/sum" Ops.foldableSum
@ -154,8 +145,10 @@ moduleName :: String
moduleName = "Data.SmallArray"
#elif defined(MEMORY_ARRAY)
moduleName = "Memory.Array"
#elif defined(DATA_PRIM_ARRAY)
moduleName = "Data.Prim.Array"
#elif defined(DATA_ARRAY_PRIM)
moduleName = "Data.Array.Prim"
#elif defined(DATA_ARRAY_PRIM_PINNED)
moduleName = "Data.Array.Prim.Pinnned"
#else
moduleName = "Data.Array"
#endif

View File

@ -8,8 +8,6 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
-- CPP:
-- MEMORY_ARRAY
@ -24,8 +22,7 @@ import Prelude (Bool, Int, Maybe(..), ($), (+), (.), (==), (>), undefined)
import qualified Prelude as P
import qualified Streamly as S hiding (foldMapWith, runStream)
import qualified Streamly.Prelude as S
import qualified Streamly.Prelude as S
#ifndef DATA_PRIM_ARRAY
#ifdef DEVBUILD
@ -33,14 +30,11 @@ import qualified Data.Foldable as F
#endif
#endif
#ifdef MEMORY_ARRAY
import qualified GHC.Exts as GHC
#endif
#ifdef DATA_SMALLARRAY
import qualified Streamly.Internal.Data.SmallArray as A
type Stream = A.SmallArray
#elif defined(MEMORY_ARRAY)
import qualified GHC.Exts as GHC
import qualified Streamly.Memory.Array as A
import qualified Streamly.Internal.Memory.Array as A
type Stream = A.Array

View File

@ -387,13 +387,13 @@ benchmark Data.Array
other-modules: Streamly.Benchmark.Data.ArrayOps
cpp-options: -DDATA_ARRAY
benchmark Data.Prim.Array
benchmark Data.Array.Prim
import: bench-options
type: exitcode-stdio-1.0
hs-source-dirs: .
main-is: Streamly/Benchmark/Data/Array.hs
other-modules: Streamly.Benchmark.Data.ArrayOps
cpp-options: -DDATA_PRIM_ARRAY
cpp-options: -DDATA_ARRAY_PRIM
build-depends: primitive
benchmark Data.SmallArray
@ -404,6 +404,14 @@ benchmark Data.SmallArray
other-modules: Streamly.Benchmark.Data.ArrayOps
cpp-options: -DDATA_SMALLARRAY
benchmark Data.Array.Prim.Pinned
import: bench-options
type: exitcode-stdio-1.0
hs-source-dirs: .
main-is: Streamly/Benchmark/Data/Array.hs
other-modules: Streamly.Benchmark.Data.ArrayOps
cpp-options: -DDATA_ARRAY_PRIM_PINNED
benchmark Memory.Array
import: bench-options
type: exitcode-stdio-1.0

View File

@ -8,8 +8,7 @@
-- Portability : GHC
--
module Streamly.Data.Prim.Array
( PrimArray
, Prim
( Array
-- * Construction
, A.fromListN
@ -39,6 +38,6 @@ module Streamly.Data.Prim.Array
)
where
import Streamly.Internal.Data.Prim.Array (PrimArray, Prim)
import Streamly.Internal.Data.Array.Prim (Array)
import qualified Streamly.Internal.Data.Prim.Array as A
import qualified Streamly.Internal.Data.Array.Prim as A

View File

@ -0,0 +1,110 @@
{-# OPTIONS_GHC -fno-warn-orphans #-}
#include "inline.hs"
-- |
-- Module : Streamly.Internal.Data.Array.Prim
-- Copyright : (c) 2020 Composewell Technologies
--
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
--
module Streamly.Internal.Data.Array.Prim
(
Array
-- * Construction
-- Pure List APIs
, A.fromListN
, A.fromList
-- Stream Folds
, fromStreamN
, fromStream
-- Monadic APIs
-- , newArray
, A.writeN -- drop new
, A.write -- full buffer
-- * Elimination
, A.toList
, toStream
, toStreamRev
, read
, unsafeRead
-- * Random Access
, length
, null
, last
-- , (!!)
, readIndex
, A.unsafeIndex
-- , readIndices
-- , readRanges
-- , readFrom -- read from a given position to the end of file
-- , readFromRev -- read from a given position to the beginning of file
-- , readTo -- read from beginning up to the given position
-- , readToRev -- read from end to the given position in file
-- , readFromTo
-- , readFromThenTo
-- , readChunksOfFrom
-- , ...
-- , writeIndex
-- , writeFrom -- start writing at the given position
-- , writeFromRev
-- , writeTo -- write from beginning up to the given position
-- , writeToRev
-- , writeFromTo
-- , writeFromThenTo
--
-- , writeChunksOfFrom
-- , ...
-- , writeIndex
-- , writeIndices
-- , writeRanges
-- -- * Search
-- , bsearch
-- , bsearchIndex
-- , find
-- , findIndex
-- , findIndices
-- -- * In-pace mutation (for Mutable Array type)
-- , partitionBy
-- , shuffleBy
-- , foldtWith
-- , foldbWith
-- * Immutable Transformations
-- , streamTransform
-- * Folding Arrays
, streamFold
, fold
-- * Folds with Array as the container
-- , D.lastN
-- * Streaming array operations
, concat
, compact
)
where
import Streamly.Internal.Data.Array.Prim.Types (Array(..), length)
import qualified Streamly.Internal.Data.Array.Prim.Types as A
#include "Streamly/Internal/Data/Array/PrimInclude.hs"

View File

@ -0,0 +1,94 @@
{-# LANGUAGE UnboxedTuples #-}
#include "inline.hs"
-- |
-- Module : Streamly.Internal.Data.Array.Prim.Mut.Types
-- Copyright : (c) 2020 Composewell Technologies
--
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
--
module Streamly.Internal.Data.Array.Prim.Mut.Types
(
Array (..)
-- * Construction
, newArray
, unsafeWriteIndex
, spliceTwo
, unsafeCopy
, fromListM
, fromListNM
, fromStreamDN
, fromStreamD
-- * Streams of arrays
, fromStreamDArraysOf
, packArraysChunksOf
, lpackArraysChunksOf
#if !defined(mingw32_HOST_OS)
-- , groupIOVecsOf
#endif
-- * Elimination
, unsafeReadIndex
, length
, byteLength
, writeN
, ArrayUnsafe(..)
, writeNUnsafe
, write
-- * Utilities
, resizeArray
, shrinkArray
)
where
#include "Streamly/Internal/Data/Array/Prim/MutTypesInclude.hs"
-------------------------------------------------------------------------------
-- Allocation (Unpinned)
-------------------------------------------------------------------------------
-- | Allocate an array that is unpinned and can hold 'count' items. The memory
-- of the array is uninitialized.
--
-- Note that this is internal routine, the reference to this array cannot be
-- given out until the array has been written to and frozen.
{-# INLINE newArray #-}
newArray ::
forall m a. (MonadIO m, Prim a)
=> Int
-> m (Array a)
newArray (I# n#) =
liftIO $ do
let bytes = n# *# sizeOf# (undefined :: a)
primitive $ \s# ->
case newByteArray# bytes s# of
(# s1#, arr# #) -> (# s1#, Array arr# #)
-- | Resize (unpinned) mutable byte array to new specified size (in elem
-- count). The returned array is either the original array resized in-place or,
-- if not possible, a newly allocated (unpinned) array (with the original
-- content copied over).
{-# INLINE resizeArray #-}
resizeArray ::
forall m a. (MonadIO m, Prim a)
=> Array a
-> Int -- ^ new size in elem count
-> m (Array a)
resizeArray (Array arr#) (I# n#) =
liftIO $ do
let bytes = n# *# sizeOf# (undefined :: a)
primitive $ \s# ->
case resizeMutableByteArray# arr# bytes s# of
(# s1#, arr1# #) -> (# s1#, Array arr1# #)

View File

@ -0,0 +1,400 @@
-- Copyright : (c) 2020 Composewell Technologies
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
import Control.Monad (when)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Primitive.Types (Prim(..), sizeOf)
import Streamly.Internal.Data.Fold.Types (Fold(..))
import Streamly.Internal.Data.SVar (adaptState)
import Streamly.Internal.Data.Strict (Tuple'(..), Tuple3'(..))
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
import GHC.Exts
import Control.Monad.Primitive
import Prelude hiding (length, unlines)
-------------------------------------------------------------------------------
-- Array Data Type
-------------------------------------------------------------------------------
data Array a = Array (MutableByteArray# RealWorld)
-------------------------------------------------------------------------------
-- Utilities
-------------------------------------------------------------------------------
-- | Copy a range of the first array to the specified region in the second
-- array. Both arrays must fully contain the specified ranges, but this is not
-- checked. The regions are allowed to overlap, although this is only possible
-- when the same array is provided as both the source and the destination.
{-# INLINE unsafeCopy #-}
unsafeCopy ::
forall m a. (MonadIO m, Prim a)
=> Array a -- ^ destination array
-> Int -- ^ offset into destination array
-> Array a -- ^ source array
-> Int -- ^ offset into source array
-> Int -- ^ number of elements to copy
-> m ()
unsafeCopy (Array dst#) (I# doff#) (Array src#) (I# soff#) (I# n#) =
liftIO $ do
let toBytes cnt# = cnt# *# (sizeOf# (undefined :: a))
primitive_ $
copyMutableByteArray#
src#
(toBytes soff#)
dst#
(toBytes doff#)
(toBytes n#)
-------------------------------------------------------------------------------
-- Length
-------------------------------------------------------------------------------
-- XXX rename to byteCount?
{-# INLINE byteLength #-}
byteLength :: MonadIO m => Array a -> m Int
byteLength (Array arr#) =
liftIO $
primitive
(\s0# ->
case getSizeofMutableByteArray# arr# s0# of
(# s1#, blen# #) -> (# s1#, I# blen# #))
-- XXX Rename length to elemCount so that there is no confusion bout what it
-- means.
--
-- XXX Since size of 'a' is statically known, we can replace `quot` with shift
-- when it is power of 2. Though it may not matter unless length is used too
-- often.
--
{-# INLINE length #-}
length ::
forall m a. (MonadIO m, Prim a)
=> Array a
-> m Int
length arr =
liftIO $ do
blen <- byteLength arr
return $ blen `quot` (sizeOf (undefined :: a))
-------------------------------------------------------------------------------
-- Random Access
-------------------------------------------------------------------------------
{-# INLINE unsafeReadIndex #-}
unsafeReadIndex :: (MonadIO m, Prim a) => Array a -> Int -> m a
unsafeReadIndex (Array arr#) (I# i#) =
liftIO $ primitive (readByteArray# arr# i#)
{-# INLINE unsafeWriteIndex #-}
unsafeWriteIndex ::
(MonadIO m, Prim a)
=> Array a -- ^ array
-> Int -- ^ index
-> a -- ^ element
-> m ()
unsafeWriteIndex (Array arr#) (I# i#) x =
liftIO $ primitive_ (writeByteArray# arr# i# x)
-------------------------------------------------------------------------------
-- Construction
-------------------------------------------------------------------------------
-- Note: We do not store the actual length of the array in the Array
-- constructor. Therefore, for "length" API to work correctly we need to match
-- the ByteArray length with the used length by shrinking it.
--
-- However, it may be expensive to always shrink the array. We may want to
-- shrink only if significant space is being wasted. If we want to do that then
-- we will have to store the used length separately. Or does GHC take care of
-- that?
-- Although the docs are not explicit about it, given how the signature is,
-- the shrinking must me inplace. "resizeMutableByteArray#" shrinks the
-- array inplace.
{-# INLINE shrinkArray #-}
shrinkArray ::
forall m a. (MonadIO m, Prim a)
=> Array a
-> Int -- ^ new size
-> m ()
shrinkArray (Array arr#) (I# n#) =
liftIO $ do
let bytes = n# *# (sizeOf# (undefined :: a))
primitive_ (shrinkMutableByteArray# arr# bytes)
-- | Fold the whole input to a single array.
--
-- /Caution! Do not use this on infinite streams./
--
-- /Internal/
{-# INLINE_NORMAL write #-}
write :: (MonadIO m, Prim a) => Fold m a (Array a)
write = Fold step initial extract
where
initial = do
marr <- newArray 0
return $ Tuple3' marr 0 0
step (Tuple3' marr i capacity) x
| i == capacity = do
let newCapacity = max (capacity * 2) 1
newMarr <- resizeArray marr newCapacity
unsafeWriteIndex newMarr i x
return $ Tuple3' newMarr (i + 1) newCapacity
| otherwise = do
unsafeWriteIndex marr i x
return $ Tuple3' marr (i + 1) capacity
extract (Tuple3' marr len _) = shrinkArray marr len >> return marr
-- | @writeN n@ folds a maximum of @n@ elements from the input stream to an
-- 'Array'.
--
-- /Internal/
{-# INLINE_NORMAL writeN #-}
writeN :: (MonadIO m, Prim a) => Int -> Fold m a (Array a)
writeN limit = Fold step initial extract
where
initial = do
marr <- newArray limit
return $ Tuple' marr 0
step (Tuple' marr i) x
| i == limit = return $ Tuple' marr i
| otherwise = do
unsafeWriteIndex marr i x
return $ Tuple' marr (i + 1)
extract (Tuple' marr len) = shrinkArray marr len >> return marr
-- Use Tuple' instead?
data ArrayUnsafe a = ArrayUnsafe
{-# UNPACK #-} !(Array a)
{-# UNPACK #-} !Int
-- | Like 'writeN' but does not check the array bounds when writing. The fold
-- driver must not call the step function more than 'n' times otherwise it will
-- corrupt the memory and crash. This function exists mainly because any
-- conditional in the step function blocks fusion causing 10x performance
-- slowdown.
--
-- /Internal/
{-# INLINE_NORMAL writeNUnsafe #-}
writeNUnsafe :: (MonadIO m, Prim a) => Int -> Fold m a (Array a)
writeNUnsafe n = Fold step initial extract
where
initial = do
arr <- newArray (max n 0)
return $ ArrayUnsafe arr 0
step (ArrayUnsafe marr i) x = do
unsafeWriteIndex marr i x
return $ ArrayUnsafe marr (i + 1)
extract (ArrayUnsafe marr i) = shrinkArray marr i >> return marr
{-# INLINE_NORMAL fromStreamDN #-}
fromStreamDN :: (MonadIO m, Prim a) => Int -> D.Stream m a -> m (Array a)
fromStreamDN limit str = do
marr <- newArray (max limit 0)
let step i x = i `seq` (unsafeWriteIndex marr i x) >> return (i + 1)
n <- D.foldlM' step (return 0) $ D.take limit str
shrinkArray marr n
return marr
{-# INLINE runFold #-}
runFold :: (Monad m) => Fold m a b -> D.Stream m a -> m b
runFold (Fold step begin done) = D.foldlMx' step begin done
{-# INLINE fromStreamD #-}
fromStreamD :: (MonadIO m, Prim a) => D.Stream m a -> m (Array a)
fromStreamD str = runFold write str
{-# INLINABLE fromListNM #-}
fromListNM :: (MonadIO m, Prim a) => Int -> [a] -> m (Array a)
fromListNM n xs = fromStreamDN n $ D.fromList xs
{-# INLINABLE fromListM #-}
fromListM :: (MonadIO m, Prim a) => [a] -> m (Array a)
fromListM xs = fromStreamD $ D.fromList xs
-------------------------------------------------------------------------------
-- Combining
-------------------------------------------------------------------------------
-- Splice two mutable arrays creating a new array.
{-# INLINE spliceTwo #-}
spliceTwo :: (MonadIO m, Prim a) => Array a -> Array a -> m (Array a)
spliceTwo a1 a2 = do
l1 <- length a1
l2 <- length a2
a3 <- resizeArray a1 (l1 + l2)
unsafeCopy a2 0 a3 l1 l2
return a3
-------------------------------------------------------------------------------
-- Stream of Arrays
-------------------------------------------------------------------------------
data GroupState s a
= GroupStart s
| GroupBuffer s (Array a) Int
| GroupYield (Array a) s
| GroupLastYield (Array a) Int
| GroupFinish
-- | @fromStreamArraysOf n stream@ groups the input stream into a stream of
-- arrays of size n.
{-# INLINE_NORMAL fromStreamDArraysOf #-}
fromStreamDArraysOf ::
(MonadIO m, Prim a) => Int -> D.Stream m a -> D.Stream m (Array a)
-- fromStreamDArraysOf n str = D.groupsOf n (writeN n) str
fromStreamDArraysOf n (D.Stream step state) = D.Stream step' (GroupStart state)
where
{-# INLINE_LATE step' #-}
step' _ (GroupStart st) = do
when (n <= 0) $
-- XXX we can pass the module string from the higher level API
error $
"Streamly.Internal.Memory.Mutable.Array.Types.fromStreamDArraysOf: the size of " ++
"arrays [" ++ show n ++ "] must be a natural number"
arr <- newArray n
return $ D.Skip (GroupBuffer st arr 0)
step' gst (GroupBuffer st arr i)
| i < n = do
r <- step (adaptState gst) st
case r of
D.Yield x s -> do
unsafeWriteIndex arr i x
return $ D.Skip (GroupBuffer s arr (i + 1))
D.Skip s -> return $ D.Skip (GroupBuffer s arr i)
D.Stop -> return $ D.Skip (GroupLastYield arr i)
| otherwise = return $ D.Skip (GroupYield arr st)
step' _ (GroupYield arr st) = do
nArr <- newArray n
return $ D.Yield arr (GroupBuffer st nArr 0)
step' _ (GroupLastYield arr i)
| i == 0 = return D.Stop
| otherwise = do
shrinkArray arr i
return $ D.Yield arr GroupFinish
step' _ GroupFinish = return D.Stop
data SpliceState s arr
= SpliceInitial s
| SpliceBuffering s arr
| SpliceYielding arr (SpliceState s arr)
| SpliceFinish
-- XXX can use general grouping combinators to achieve this?
-- | Coalesce adjacent arrays in incoming stream to form bigger arrays of a
-- maximum specified size in bytes. Note that if a single array is bigger than
-- the specified size we do not split it to fit. When we coalesce multiple
-- arrays if the size would exceed the specified size we do not coalesce
-- therefore the actual array size may be less than the specified chunk size.
--
-- /Internal/
{-# INLINE_NORMAL packArraysChunksOf #-}
packArraysChunksOf ::
(MonadIO m, Prim a)
=> Int
-> D.Stream m (Array a)
-> D.Stream m (Array a)
packArraysChunksOf n (D.Stream step state) =
D.Stream step' (SpliceInitial state)
where
{-# INLINE_LATE step' #-}
step' gst (SpliceInitial st) = do
when (n <= 0) $
-- XXX we can pass the module string from the higher level API
error $ "Streamly.Internal.Memory.Mutable.Array.Types.packArraysChunksOf: the size of "
++ "arrays [" ++ show n ++ "] must be a natural number"
r <- step gst st
case r of
D.Yield arr s -> do
len <- byteLength arr
return $
if len >= n
then D.Skip $ SpliceYielding arr (SpliceInitial s)
else D.Skip $ SpliceBuffering s arr
D.Skip s -> return $ D.Skip (SpliceInitial s)
D.Stop -> return $ D.Stop
step' gst (SpliceBuffering st buf) = do
r <- step gst st
case r of
D.Yield arr s -> do
blen <- byteLength buf
alen <- byteLength arr
let len = blen + alen
if len > n
then return $
D.Skip (SpliceYielding buf (SpliceBuffering s arr))
else do
buf' <- spliceTwo buf arr
return $ D.Skip (SpliceBuffering s buf')
D.Skip s -> return $ D.Skip (SpliceBuffering s buf)
D.Stop -> return $ D.Skip (SpliceYielding buf SpliceFinish)
step' _ SpliceFinish = return D.Stop
step' _ (SpliceYielding arr next) = return $ D.Yield arr next
{-# INLINE_NORMAL lpackArraysChunksOf #-}
lpackArraysChunksOf ::
(MonadIO m, Prim a) => Int -> Fold m (Array a) () -> Fold m (Array a) ()
lpackArraysChunksOf n (Fold step1 initial1 extract1) =
Fold step initial extract
where
initial = do
when (n <= 0) $
-- XXX we can pass the module string from the higher level API
error $ "Streamly.Internal.Memory.Mutable.Array.Types.packArraysChunksOf: the size of "
++ "arrays [" ++ show n ++ "] must be a natural number"
r1 <- initial1
return (Tuple' Nothing r1)
extract (Tuple' Nothing r1) = extract1 r1
extract (Tuple' (Just buf) r1) = do
r <- step1 r1 buf
extract1 r
step (Tuple' Nothing r1) arr = do
len <- byteLength arr
if len >= n
then do
r <- step1 r1 arr
extract1 r
r1' <- initial1
return (Tuple' Nothing r1')
else return (Tuple' (Just arr) r1)
step (Tuple' (Just buf) r1) arr = do
blen <- byteLength buf
alen <- byteLength arr
let len = blen + alen
buf' <- spliceTwo buf arr
if len >= n
then do
r <- step1 r1 buf'
extract1 r
r1' <- initial1
return (Tuple' Nothing r1')
else return (Tuple' (Just buf') r1)

View File

@ -0,0 +1,108 @@
#include "inline.hs"
-- |
-- Module : Streamly.Internal.Data.Array.Prim.Pinned
-- Copyright : (c) 2020 Composewell Technologies
--
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
--
module Streamly.Internal.Data.Array.Prim.Pinned
(
Array
-- * Construction
-- Pure List APIs
, A.fromListN
, A.fromList
-- Stream Folds
, fromStreamN
, fromStream
-- Monadic APIs
-- , newArray
, A.writeN -- drop new
, A.write -- full buffer
-- * Elimination
, A.toList
, toStream
, toStreamRev
, read
, unsafeRead
-- * Random Access
, length
, null
, last
-- , (!!)
, readIndex
, A.unsafeIndex
-- , readIndices
-- , readRanges
-- , readFrom -- read from a given position to the end of file
-- , readFromRev -- read from a given position to the beginning of file
-- , readTo -- read from beginning up to the given position
-- , readToRev -- read from end to the given position in file
-- , readFromTo
-- , readFromThenTo
-- , readChunksOfFrom
-- , ...
-- , writeIndex
-- , writeFrom -- start writing at the given position
-- , writeFromRev
-- , writeTo -- write from beginning up to the given position
-- , writeToRev
-- , writeFromTo
-- , writeFromThenTo
--
-- , writeChunksOfFrom
-- , ...
-- , writeIndex
-- , writeIndices
-- , writeRanges
-- -- * Search
-- , bsearch
-- , bsearchIndex
-- , find
-- , findIndex
-- , findIndices
-- -- * In-pace mutation (for Mutable Array type)
-- , partitionBy
-- , shuffleBy
-- , foldtWith
-- , foldbWith
-- * Immutable Transformations
-- , streamTransform
-- * Folding Arrays
, streamFold
, fold
-- * Folds with Array as the container
-- , D.lastN
-- * Streaming array operations
, concat
, compact
)
where
import Streamly.Internal.Data.Array.Prim.Pinned.Types (Array(..), length)
import qualified Streamly.Internal.Data.Array.Prim.Pinned.Types as A
#include "Streamly/Internal/Data/Array/PrimInclude.hs"

View File

@ -0,0 +1,176 @@
{-# LANGUAGE UnboxedTuples #-}
#include "inline.hs"
-- |
-- Module : Streamly.Internal.Data.Array.Prim.Pinned.Mut.Types
-- Copyright : (c) 2020 Composewell Technologies
--
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
--
module Streamly.Internal.Data.Array.Prim.Pinned.Mut.Types
(
Array (..)
-- * Construction
, newArray
, newAlignedArray
, unsafeWriteIndex
, spliceTwo
, unsafeCopy
, fromListM
, fromListNM
, fromStreamDN
, fromStreamD
-- * Streams of arrays
, fromStreamDArraysOf
, packArraysChunksOf
, lpackArraysChunksOf
#if !defined(mingw32_HOST_OS)
-- , groupIOVecsOf
#endif
-- * Elimination
, unsafeReadIndex
, length
, byteLength
, writeN
, ArrayUnsafe(..)
, writeNUnsafe
, writeNAligned
, write
-- * Utilities
, resizeArray
, shrinkArray
, touchArray
, withArrayAsPtr
)
where
import GHC.IO (IO(..))
#include "Streamly/Internal/Data/Array/Prim/MutTypesInclude.hs"
-------------------------------------------------------------------------------
-- Allocation (Pinned)
-------------------------------------------------------------------------------
-- XXX we can use a single newArray routine which accepts an allocation
-- function which could be newByteArray#, newPinnedByteArray# or
-- newAlignedPinnedByteArray#. That function can go in the common include file.
--
-- | Allocate an array that is pinned and can hold 'count' items. The memory of
-- the array is uninitialized.
--
-- Note that this is internal routine, the reference to this array cannot be
-- given out until the array has been written to and frozen.
{-# INLINE newArray #-}
newArray ::
forall m a. (MonadIO m, Prim a)
=> Int
-> m (Array a)
newArray (I# n#) =
liftIO $ do
let bytes = n# *# sizeOf# (undefined :: a)
primitive $ \s# ->
case newPinnedByteArray# bytes s# of
(# s1#, arr# #) -> (# s1#, Array arr# #)
-- Change order of args?
-- | Allocate a new array aligned to the specified alignment and using pinned
-- memory.
{-# INLINE newAlignedArray #-}
newAlignedArray ::
forall m a. (MonadIO m, Prim a)
=> Int -- size
-> Int -- Alignment
-> m (Array a)
newAlignedArray (I# n#) (I# a#) =
liftIO $ do
let bytes = n# *# sizeOf# (undefined :: a)
primitive $ \s# ->
case newAlignedPinnedByteArray# bytes a# s# of
(# s1#, arr# #) -> (# s1#, Array arr# #)
-- | Resize (pinned) mutable byte array to new specified size (in elem
-- count). The returned array is either the original array resized in-place or,
-- if not possible, a newly allocated (pinned) array (with the original content
-- copied over).
{-# INLINE resizeArray #-}
resizeArray ::
(MonadIO m, Prim a)
=> Array a
-> Int -- ^ new size
-> m (Array a)
resizeArray arr i = do
len <- length arr
if len == i
then return arr
else if i < len
then shrinkArray arr i >> return arr
else do
nArr <- newArray i
unsafeCopy nArr 0 arr 0 len
return nArr
-------------------------------------------------------------------------------
-- Aligned Construction
-------------------------------------------------------------------------------
-- XXX we can also factor out common code in writeN and writeNAligned in the
-- same way as suggested above.
--
{-# INLINE_NORMAL writeNAligned #-}
writeNAligned ::
(MonadIO m, Prim a)
=> Int
-> Int
-> Fold m a (Array a)
writeNAligned align limit = Fold step initial extract
where
initial = do
marr <- newAlignedArray limit align
return (marr, 0)
step (marr, i) x
| i == limit = return (marr, i)
| otherwise = do
unsafeWriteIndex marr i x
return (marr, i + 1)
extract (marr, len) = shrinkArray marr len >> return marr
-------------------------------------------------------------------------------
-- Mutation with pointers
-------------------------------------------------------------------------------
-- XXX This section can probably go in a common include file for pinned arrays.
-- Change name later.
{-# INLINE toPtr #-}
toPtr :: Array a -> Ptr a
toPtr (Array arr#) = Ptr (byteArrayContents# (unsafeCoerce# arr#))
{-# INLINE touchArray #-}
touchArray :: Array a -> IO ()
touchArray arr = IO $ \s -> case touch# arr s of s' -> (# s', () #)
{-# INLINE withArrayAsPtr #-}
withArrayAsPtr :: Array a -> (Ptr a -> IO b) -> IO b
withArrayAsPtr arr f = do
r <- f (toPtr arr)
touchArray arr
return r

View File

@ -0,0 +1,127 @@
{-# LANGUAGE UnboxedTuples #-}
#include "inline.hs"
-- |
-- Module : Streamly.Internal.Data.Array.Prim.Pinned.Types
-- Copyright : (c) 2020 Composewell Technologies
--
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
--
module Streamly.Internal.Data.Array.Prim.Pinned.Types
(
Array (..)
, unsafeFreeze
, unsafeFreezeWithShrink
-- , unsafeThaw
, defaultChunkSize
, nil
-- * Construction
, spliceTwo
, fromList
, fromListN
, fromStreamDN
, fromStreamD
-- * Streams of arrays
, fromStreamDArraysOf
, FlattenState (..) -- for inspection testing
, flattenArrays
, flattenArraysRev
, SpliceState (..) -- for inspection testing
, packArraysChunksOf
, lpackArraysChunksOf
#if !defined(mingw32_HOST_OS)
-- , groupIOVecsOf
#endif
, splitOn
, breakOn
-- * Elimination
, unsafeIndex
, byteLength
, length
, foldl'
, foldr
, foldr'
, foldlM'
, splitAt
, toStreamD
, toStreamDRev
, toStreamK
, toStreamKRev
, toList
-- , toArrayMinChunk
, writeN
, MA.ArrayUnsafe(..)
, writeNUnsafe
, write
, unlines
, toPtr
, touchArray
, withArrayAsPtr
)
where
import Foreign.C.Types (CSize(..))
import GHC.IO (IO(..))
import Foreign.Ptr (minusPtr, nullPtr, plusPtr)
import qualified Streamly.Internal.Data.Array.Prim.Pinned.Mut.Types as MA
#include "Streamly/Internal/Data/Array/Prim/TypesInclude.hs"
-------------------------------------------------------------------------------
-- Utility functions
-------------------------------------------------------------------------------
foreign import ccall unsafe "string.h memchr" c_memchr
:: Ptr Word8 -> Word8 -> CSize -> IO (Ptr Word8)
-------------------------------------------------------------------------------
-- Using as a Pointer
-------------------------------------------------------------------------------
-- Change name later.
{-# INLINE toPtr #-}
toPtr :: Array a -> Ptr a
toPtr (Array arr# off _) = Ptr (byteArrayContents# arr#) `plusPtr` off
{-# INLINE touchArray #-}
touchArray :: Array a -> IO ()
touchArray (Array arr# _ _) = IO $ \s -> case touch# arr# s of s1 -> (# s1, () #)
{-# INLINE withArrayAsPtr #-}
withArrayAsPtr :: Array a -> (Ptr a -> IO b) -> IO b
withArrayAsPtr arr f = do
r <- f (toPtr arr)
touchArray arr
return r
-- Drops the separator byte
{-# INLINE breakOn #-}
breakOn ::
MonadIO m
=> Word8
-> Array Word8
-> m (Array Word8, Maybe (Array Word8))
breakOn sep arr@(Array arr# off len) = do
let p = toPtr arr
loc = unsafePerformIO $ c_memchr p sep (fromIntegral (byteLength arr))
len1 = loc `minusPtr` p
len2 = len - len1 - 1
return $
if loc == nullPtr
then (arr, Nothing)
else ( Array arr# off len1
, Just $ Array arr# (off + len1 + 1) len2)

View File

@ -0,0 +1,98 @@
{-# LANGUAGE UnboxedTuples #-}
#include "inline.hs"
-- |
-- Module : Streamly.Internal.Data.Array.Prim.Types
-- Copyright : (c) 2020 Composewell Technologies
--
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
--
module Streamly.Internal.Data.Array.Prim.Types
(
Array (..)
, unsafeFreeze
, unsafeFreezeWithShrink
-- , unsafeThaw
, defaultChunkSize
, nil
-- * Construction
, spliceTwo
, fromList
, fromListN
, fromStreamDN
, fromStreamD
-- * Streams of arrays
, fromStreamDArraysOf
, FlattenState (..) -- for inspection testing
, flattenArrays
, flattenArraysRev
, SpliceState (..) -- for inspection testing
, packArraysChunksOf
, lpackArraysChunksOf
#if !defined(mingw32_HOST_OS)
-- , groupIOVecsOf
#endif
, splitOn
, breakOn
-- * Elimination
, unsafeIndex
, byteLength
, length
, foldl'
, foldr
, foldr'
, foldlM'
, splitAt
, toStreamD
, toStreamDRev
, toStreamK
, toStreamKRev
, toList
-- , toArrayMinChunk
, writeN
, MA.ArrayUnsafe(..)
, writeNUnsafe
, write
, unlines
)
where
import qualified Streamly.Internal.Data.Array.Prim.Mut.Types as MA
#include "Streamly/Internal/Data/Array/Prim/TypesInclude.hs"
-- Drops the separator byte
-- Inefficient compared to Memory Array
{-# INLINE breakOn #-}
breakOn ::
MonadIO m
=> Word8
-> Array Word8
-> m (Array Word8, Maybe (Array Word8))
breakOn sep arr@(Array arr# off len) =
case loc of
Left _ -> return (arr, Nothing)
Right len1 -> do
let len2 = len - len1 - 1
return (Array arr# off len1, Just $ Array arr# (off + len1 + 1) len2)
where
loc = foldl' chk (Left 0) arr
chk (Left i) a =
if a == sep
then Right i
else Left (i + 1)
chk r _ = r

View File

@ -0,0 +1,694 @@
-- Copyright : (c) 2020 Composewell Technologies
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
import Control.DeepSeq (NFData(..))
import Control.Monad (when)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Primitive (PrimMonad(..), primitive_)
import Data.Primitive.Types (Prim(..), sizeOf)
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
#endif
import Data.Word (Word8)
import Streamly.Internal.Data.Strict (Tuple3'(..), Maybe'(..))
import Streamly.Internal.Data.Fold.Types (Fold(..))
import Streamly.Internal.Data.SVar (adaptState)
import System.IO.Unsafe (unsafePerformIO)
import Text.Read (readPrec, readListPrec, readListPrecDefault)
import qualified GHC.Exts as Exts
import qualified Prelude as P
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
import qualified Streamly.Internal.Data.Stream.StreamK as K
import GHC.Exts hiding (fromListN, fromList, toList)
import Prelude hiding (length, unlines, foldr)
-------------------------------------------------------------------------------
-- Array Data Type
-------------------------------------------------------------------------------
data Array a = Array ByteArray# Int Int
-------------------------------------------------------------------------------
-- Utilities
-------------------------------------------------------------------------------
-- | Both arrays must fully contain the specified ranges, but this is not
-- checked. The two arrays must not be the same array in different states, but
-- this is not checked either.
{-# INLINE unsafeCopy #-}
unsafeCopy ::
forall m a. (MonadIO m, Prim a)
=> MA.Array a -- ^ destination array
-> Int -- ^ offset into destination array
-> Array a -- ^ source array
-> Int -- ^ offset into source array
-> Int -- ^ number of elements to copy
-> m ()
unsafeCopy (MA.Array dst#) (I# doff#) (Array src# (I# off#) _) (I# soff#) (I# n#) =
liftIO $ do
let toBytes cnt# = cnt# *# (sizeOf# (undefined :: a))
primitive_ $
copyByteArray#
src#
(toBytes (off# +# soff#))
dst#
(toBytes doff#)
(toBytes n#)
-------------------------------------------------------------------------------
-- Basic Byte Array Operations
-------------------------------------------------------------------------------
{-# INLINE unsafeFreeze #-}
unsafeFreeze :: (Prim a, MonadIO m) => MA.Array a -> m (Array a)
unsafeFreeze marr@(MA.Array arr#) =
liftIO $ do
len <- MA.length marr
primitive $ \s# ->
case unsafeFreezeByteArray# arr# s# of
(# s1#, arr1# #) -> (# s1#, Array arr1# 0 len #)
{-# INLINE unsafeFreezeWithShrink #-}
unsafeFreezeWithShrink ::
(Prim a, MonadIO m) => MA.Array a -> Int -> m (Array a)
unsafeFreezeWithShrink marr@(MA.Array arr#) n =
liftIO $ do
MA.shrinkArray marr n
len <- MA.length marr
primitive $ \s# ->
case unsafeFreezeByteArray# arr# s# of
(# s1#, arr1# #) -> (# s1#, Array arr1# 0 len #)
{-
-- Should never be used in general
{-# INLINE unsafeThaw #-}
unsafeThaw :: MonadIO m => Array a -> m (MA.Array a)
unsafeThaw (Array arr#) =
primitive $ \s# -> (# s#, MA.Array (unsafeCoerce# arr#) #)
-}
-- Unsafe because the index bounds are not checked
{-# INLINE unsafeIndex #-}
unsafeIndex :: Prim a => Array a -> Int -> a
unsafeIndex (Array arr# (I# off#) _) (I# i#) = indexByteArray# arr# (off# +# i#)
-- unsafe
sameByteArray :: ByteArray# -> ByteArray# -> Bool
sameByteArray ba1 ba2 =
case reallyUnsafePtrEquality#
(unsafeCoerce# ba1 :: ())
(unsafeCoerce# ba2 :: ()) of
r -> isTrue# r
-------------------------------------------------------------------------------
-- Chunk Size
-------------------------------------------------------------------------------
-- XXX move this section to mutable array module?
mkChunkSizeKB :: Int -> Int
mkChunkSizeKB n = n * k
where k = 1024
-- | Default maximum buffer size in bytes, for reading from and writing to IO
-- devices, the value is 32KB minus GHC allocation overhead, which is a few
-- bytes, so that the actual allocation is 32KB.
defaultChunkSize :: Int
defaultChunkSize = mkChunkSizeKB 32
-------------------------------------------------------------------------------
-- Length
-------------------------------------------------------------------------------
-- XXX rename to byteCount?
{-# INLINE byteLength #-}
byteLength :: forall a. Prim a => Array a -> Int
byteLength (Array _ _ len) = len * sizeOf (undefined :: a)
-- XXX Also, rename to elemCount
-- XXX I would prefer length to keep the API consistent
-- XXX Also, re-export sizeOf from Primitive
{-# INLINE length #-}
length :: Array a -> Int
length (Array _ _ len) = len
-------------------------------------------------------------------------------
-- Construction
-------------------------------------------------------------------------------
-- | Use a slice of an array as another array. Note that this is unsafe and does
-- not check the bounds
slice :: Array a -> Int -> Int -> Array a
slice (Array arr# off _) off1 len1 = Array arr# (off + off1) len1
nil :: Prim a => Array a
nil =
unsafePerformIO $ do
arr <- MA.newArray 0
unsafeFreeze arr
-- | Fold the whole input to a single array.
--
-- /Caution! Do not use this on infinite streams./
--
-- /Internal/
{-# INLINE_NORMAL write #-}
write :: (MonadIO m, Prim a) => Fold m a (Array a)
write = FL.mapM unsafeFreeze MA.write
-- | @writeN n@ folds a maximum of @n@ elements from the input stream to an
-- 'Array'.
--
-- /Internal/
{-# INLINE_NORMAL writeN #-}
writeN :: (MonadIO m, Prim a) => Int -> Fold m a (Array a)
writeN limit = FL.mapM unsafeFreeze (MA.writeN limit)
-- | Like 'writeN' but does not check the array bounds when writing. The fold
-- driver must not call the step function more than 'n' times otherwise it will
-- corrupt the memory and crash. This function exists mainly because any
-- conditional in the step function blocks fusion causing 10x performance
-- slowdown.
--
-- /Internal/
{-# INLINE_NORMAL writeNUnsafe #-}
writeNUnsafe :: (MonadIO m, Prim a) => Int -> Fold m a (Array a)
writeNUnsafe limit = FL.mapM unsafeFreeze (MA.writeNUnsafe limit)
{-# INLINE_NORMAL fromStreamDN #-}
fromStreamDN :: (MonadIO m, Prim a) => Int -> D.Stream m a -> m (Array a)
fromStreamDN limit str = MA.fromStreamDN limit str >>= unsafeFreeze
{-# INLINE fromStreamD #-}
fromStreamD :: (MonadIO m, Prim a) => D.Stream m a -> m (Array a)
fromStreamD str = MA.fromStreamD str >>= unsafeFreeze
-- | @fromStreamArraysOf n stream@ groups the input stream into a stream of
-- arrays of size n.
{-# INLINE_NORMAL fromStreamDArraysOf #-}
fromStreamDArraysOf ::
(MonadIO m, Prim a) => Int -> D.Stream m a -> D.Stream m (Array a)
fromStreamDArraysOf n str = D.mapM unsafeFreeze (MA.fromStreamDArraysOf n str)
-- XXX derive from MA.fromListN?
{-# INLINE fromListN #-}
fromListN :: Prim a => Int -> [a] -> Array a
fromListN len xs = unsafePerformIO $ MA.fromListNM len xs >>= unsafeFreeze
-- XXX derive from MA.fromList?
{-# INLINE fromList #-}
fromList :: Prim a => [a] -> Array a
fromList xs = fromListN (P.length xs) xs
-------------------------------------------------------------------------------
-- Combining
-------------------------------------------------------------------------------
-- | Splice two immutable arrays creating a new immutable array.
{-# INLINE spliceTwo #-}
spliceTwo :: (MonadIO m, Prim a) => Array a -> Array a -> m (Array a)
spliceTwo a1 a2 = do
let l1 = length a1
l2 = length a2
a3 <- MA.newArray (l1 + l2)
unsafeCopy a3 0 a1 0 l1
unsafeCopy a3 l1 a2 0 l2
unsafeFreeze a3 -- Use `unsafeFreezeWith off len`?
-------------------------------------------------------------------------------
-- Elimination
-------------------------------------------------------------------------------
{-# INLINE_LATE toListFB #-}
toListFB :: Prim a => (a -> b -> b) -> b -> Array a -> b
toListFB c n arr = go 0
where
len = length arr
go p | p == len = n
go p =
let !x = unsafeIndex arr p
in c x (go (p + 1))
-- | Convert an 'Array' into a list.
--
-- /Internal/
{-# INLINE toList #-}
toList :: Prim a => Array a -> [a]
toList s = build (\c n -> toListFB c n s)
-------------------------------------------------------------------------------
-- Instances
-------------------------------------------------------------------------------
instance (Eq a, Prim a) => Eq (Array a) where
{-# INLINE (==) #-}
a1@(Array ba1# _ len1) == a2@(Array ba2# _ len2)
| sameByteArray ba1# ba2# = True
| len1 /= len2 = False
| otherwise = loop (len1 - 1)
where
loop !i
| i < 0 = True
| otherwise = unsafeIndex a1 i == unsafeIndex a2 i && loop (i - 1)
-- | Lexicographic ordering. Subject to change between major versions.
instance (Ord a, Prim a) => Ord (Array a) where
{-# INLINE compare #-}
compare a1@(Array ba1# _ len1) a2@(Array ba2# _ len2)
| sameByteArray ba1# ba2# = EQ
| otherwise = loop 0
where
sz = min len1 len2
loop !i
| i < sz =
compare (unsafeIndex a1 i) (unsafeIndex a2 i) <> loop (i + 1)
| otherwise = compare len1 len2
instance Prim a => Semigroup (Array a) where
-- XXX can't we use runST instead of inlineIO?
-- XXX I plan to remove MonadIO and replace it with IO
a <> b = unsafePerformIO (spliceTwo a b :: IO (Array a))
instance Prim a => Monoid (Array a) where
mempty = nil
mappend = (<>)
instance NFData (Array a) where
{-# INLINE rnf #-}
rnf _ = ()
-- XXX check if this is compatible with Memory.Array?
-- XXX It isn't. I might prefer this Show instance though
-- XXX Memory.Array: showsPrec _ = shows . toList
instance (Show a, Prim a) => Show (Array a) where
showsPrec p a =
showParen (p > 10) $
showString "fromListN "
. shows (length a)
. showString " "
. shows (toList a)
instance (a ~ Char) => IsString (Array a) where
{-# INLINE fromString #-}
fromString = fromList
-- GHC versions 8.0 and below cannot derive IsList
instance Prim a => IsList (Array a) where
type (Item (Array a)) = a
{-# INLINE fromList #-}
fromList = fromList
{-# INLINE fromListN #-}
fromListN = fromListN
{-# INLINE toList #-}
toList = toList
instance (Prim a, Read a, Show a) => Read (Array a) where
{-# INLINE readPrec #-}
readPrec = fromList <$> readPrec
readListPrec = readListPrecDefault
-- XXX these folds can be made common with mutable arrays by defining a
-- unsafeIndex in the specific module?
-------------------------------------------------------------------------------
-- Folds
-------------------------------------------------------------------------------
{-# INLINE foldr #-}
foldr :: Prim a => (a -> b -> b) -> b -> Array a -> b
foldr f z arr = go 0
where
!len = length arr
go !i
| len > i = f (unsafeIndex arr i) (go (i + 1))
| otherwise = z
-- | Strict right-associated fold over the elements of an 'Array'.
{-# INLINE foldr' #-}
foldr' :: Prim a => (a -> b -> b) -> b -> Array a -> b
foldr' f z0 arr = go (length arr - 1) z0
where
go !i !acc
| i < 0 = acc
| otherwise = go (i - 1) (f (unsafeIndex arr i) acc)
-- | Strict left-associated fold over the elements of an 'Array'.
{-# INLINE foldl' #-}
foldl' :: Prim a => (b -> a -> b) -> b -> Array a -> b
foldl' f z0 arr = go 0 z0
where
!len = length arr
go !i !acc
| i < len = go (i + 1) (f acc (unsafeIndex arr i))
| otherwise = acc
-- | Strict left-associated fold over the elements of an 'Array'.
{-# INLINE foldlM' #-}
foldlM' :: (Prim a, Monad m) => (b -> a -> m b) -> b -> Array a -> m b
foldlM' f z0 arr = go 0 z0
where
!len = length arr
go !i !acc1
| i < len = do
acc2 <- f acc1 (unsafeIndex arr i)
go (i + 1) acc2
| otherwise = return acc1
-------------------------------------------------------------------------------
-- Converting to streams
-------------------------------------------------------------------------------
{-# INLINE_NORMAL toStreamD #-}
toStreamD :: (Prim a, Monad m) => Array a -> D.Stream m a
toStreamD arr = D.Stream step 0
where
{-# INLINE_LATE step #-}
step _ i
| i == length arr = return D.Stop
step _ i = return $ D.Yield (unsafeIndex arr i) (i + 1)
{-# INLINE toStreamK #-}
toStreamK :: (K.IsStream t, Prim a) => Array a -> t m a
toStreamK arr = go 0
where
len = length arr
go p
| p == len = K.nil
| otherwise =
let !x = unsafeIndex arr p
in x `K.cons` go (p + 1)
{-# INLINE_NORMAL toStreamDRev #-}
toStreamDRev :: (Prim a, Monad m) => Array a -> D.Stream m a
toStreamDRev arr = D.Stream step (length arr - 1)
where
{-# INLINE_LATE step #-}
step _ i
| i < 0 = return D.Stop
step _ i = return $ D.Yield (unsafeIndex arr i) (i - 1)
{-# INLINE toStreamKRev #-}
toStreamKRev :: (K.IsStream t, Prim a) => Array a -> t m a
toStreamKRev arr = go (length arr - 1)
where
go p | p == -1 = K.nil
| otherwise =
let !x = unsafeIndex arr p
in x `K.cons` go (p - 1)
-------------------------------------------------------------------------------
-- Stream of Arrays
-------------------------------------------------------------------------------
data FlattenState s a =
OuterLoop s
| InnerLoop s !(Array a) !Int !Int
{-# INLINE_NORMAL flattenArrays #-}
flattenArrays :: (MonadIO m, Prim a) => D.Stream m (Array a) -> D.Stream m a
flattenArrays (D.Stream step state) = D.Stream step' (OuterLoop state)
where
{-# INLINE_LATE step' #-}
step' gst (OuterLoop st) = do
r <- step (adaptState gst) st
return $ case r of
D.Yield arr s ->
let len = length arr
in if len == 0
then D.Skip (OuterLoop s)
else D.Skip (InnerLoop s arr len 0)
D.Skip s -> D.Skip (OuterLoop s)
D.Stop -> D.Stop
step' _ (InnerLoop st _ len i) | i == len =
return $ D.Skip $ OuterLoop st
step' _ (InnerLoop st arr len i) = do
let x = unsafeIndex arr i
return $ D.Yield x (InnerLoop st arr len (i + 1))
{-# INLINE_NORMAL flattenArraysRev #-}
flattenArraysRev :: (MonadIO m, Prim a) => D.Stream m (Array a) -> D.Stream m a
flattenArraysRev (D.Stream step state) = D.Stream step' (OuterLoop state)
where
{-# INLINE_LATE step' #-}
step' gst (OuterLoop st) = do
r <- step (adaptState gst) st
return $ case r of
D.Yield arr s ->
let len = length arr
in if len == 0
then D.Skip (OuterLoop s)
else D.Skip (InnerLoop s arr len (len - 1))
D.Skip s -> D.Skip (OuterLoop s)
D.Stop -> D.Stop
step' _ (InnerLoop st _ _ i) | i == -1 =
return $ D.Skip $ OuterLoop st
step' _ (InnerLoop st arr len i) = do
let x = unsafeIndex arr i
return $ D.Yield x (InnerLoop st arr len (i - 1))
{-# INLINE_NORMAL unlines #-}
unlines :: (MonadIO m, Prim a) => a -> D.Stream m (Array a) -> D.Stream m a
unlines sep (D.Stream step state) = D.Stream step' (OuterLoop state)
where
{-# INLINE_LATE step' #-}
step' gst (OuterLoop st) = do
r <- step (adaptState gst) st
return $ case r of
D.Yield arr s ->
let len = length arr
in D.Skip (InnerLoop s arr len 0)
D.Skip s -> D.Skip (OuterLoop s)
D.Stop -> D.Stop
step' _ (InnerLoop st _ len i) | i == len =
return $ D.Yield sep $ OuterLoop st
step' _ (InnerLoop st arr len i) = do
let x = unsafeIndex arr i
return $ D.Yield x (InnerLoop st arr len (i + 1))
-- Splice an array into a pre-reserved mutable array. The user must ensure
-- that there is enough space in the mutable array.
{-# INLINE spliceInto #-}
spliceInto :: (MonadIO m, Prim a) => MA.Array a -> Int -> Array a -> m Int
spliceInto dst doff src@(Array _ _ len) = do
unsafeCopy dst doff src 0 len
return $ doff + len
data SpliceState s arr1 arr2
= SpliceInitial s
| SpliceBuffering s arr2
| SpliceYielding arr1 (SpliceState s arr1 arr2)
| SpliceFinish
-- | Coalesce adjacent arrays in incoming stream to form bigger arrays of a
-- maximum specified size in bytes. Note that if a single array is bigger than
-- the specified size we do not split it to fit. When we coalesce multiple
-- arrays if the size would exceed the specified size we do not coalesce
-- therefore the actual array size may be less than the specified chunk size.
--
-- /Internal/
{-# INLINE_NORMAL packArraysChunksOf #-}
packArraysChunksOf ::
forall m a. (MonadIO m, Prim a)
=> Int
-> D.Stream m (Array a)
-> D.Stream m (Array a)
packArraysChunksOf n (D.Stream step state) =
D.Stream step' (SpliceInitial state)
where
nElem = n `quot` sizeOf (undefined :: a)
{-# INLINE_LATE step' #-}
step' gst (SpliceInitial st) = do
when (n <= 0) $
-- XXX we can pass the module string from the higher level API
error $ "Streamly.Internal.Memory.Array.Types.packArraysChunksOf: the size of "
++ "arrays [" ++ show n ++ "] must be a natural number"
r <- step gst st
case r of
D.Yield arr s ->
if length arr >= nElem
then return $ D.Skip (SpliceYielding arr (SpliceInitial s))
else do
buf <- MA.newArray nElem
noff <- spliceInto buf 0 arr
return $ D.Skip (SpliceBuffering s (buf, noff))
D.Skip s -> return $ D.Skip (SpliceInitial s)
D.Stop -> return $ D.Stop
step' gst (SpliceBuffering st arr2@(buf, boff)) = do
r <- step gst st
case r of
D.Yield arr s -> do
if boff + length arr > nElem
then do
nArr <- unsafeFreeze buf
return $ D.Skip (SpliceYielding (slice nArr 0 boff) (SpliceBuffering s arr2))
else do
noff <- spliceInto buf boff arr
return $ D.Skip (SpliceBuffering s (buf, noff))
D.Skip s -> return $ D.Skip (SpliceBuffering s arr2)
D.Stop -> do
nArr <- unsafeFreeze buf
return $ D.Skip (SpliceYielding (slice nArr 0 boff) SpliceFinish)
step' _ SpliceFinish = return D.Stop
step' _ (SpliceYielding arr next) = return $ D.Yield arr next
{-# INLINE_NORMAL lpackArraysChunksOf #-}
lpackArraysChunksOf ::
forall m a. (MonadIO m, Prim a)
=> Int
-> Fold m (Array a) ()
-> Fold m (Array a) ()
lpackArraysChunksOf n (Fold step1 initial1 extract1) =
Fold step initial extract
where
nElem = n `quot` sizeOf (undefined :: a)
initial = do
when (n <= 0) $
-- XXX we can pass the module string from the higher level API
error $ "Streamly.Internal.Memory.Array.Types.packArraysChunksOf: the size of "
++ "arrays [" ++ show n ++ "] must be a natural number"
r1 <- initial1
return (Tuple3' Nothing' 0 r1)
extract (Tuple3' Nothing' _ r1) = extract1 r1
extract (Tuple3' (Just' buf) boff r1) = do
nArr <- unsafeFreeze buf
r <- step1 r1 (slice nArr 0 boff)
extract1 r
step (Tuple3' Nothing' _ r1) arr =
if length arr >= nElem
then do
r <- step1 r1 arr
extract1 r
r1' <- initial1
return (Tuple3' Nothing' 0 r1')
else do
buf <- MA.newArray nElem
noff <- spliceInto buf 0 arr
return (Tuple3' (Just' buf) noff r1)
step (Tuple3' (Just' buf) boff r1) arr = do
noff <- spliceInto buf boff arr
if noff >= nElem
then do
nArr <- unsafeFreeze buf
r <- step1 r1 (slice nArr 0 noff)
extract1 r
r1' <- initial1
return (Tuple3' Nothing' 0 r1')
else return (Tuple3' (Just' buf) noff r1)
data SplitState s arr
= Initial s
| Buffering s arr
| Splitting s arr
| Yielding arr (SplitState s arr)
| Finishing
-- | Split a stream of arrays on a given separator byte, dropping the separator
-- and coalescing all the arrays between two separators into a single array.
--
-- /Internal/
{-# INLINE_NORMAL splitOn #-}
splitOn ::
MonadIO m
=> Word8
-> D.Stream m (Array Word8)
-> D.Stream m (Array Word8)
splitOn byte (D.Stream step state) = D.Stream step' (Initial state)
where
{-# INLINE_LATE step' #-}
step' gst (Initial st) = do
r <- step gst st
case r of
D.Yield arr s -> do
(arr1, marr2) <- breakOn byte arr
return $ case marr2 of
Nothing -> D.Skip (Buffering s arr1)
Just arr2 -> D.Skip (Yielding arr1 (Splitting s arr2))
D.Skip s -> return $ D.Skip (Initial s)
D.Stop -> return $ D.Stop
step' gst (Buffering st buf) = do
r <- step gst st
case r of
D.Yield arr s -> do
(arr1, marr2) <- breakOn byte arr
buf' <- spliceTwo buf arr1
return $ case marr2 of
Nothing -> D.Skip (Buffering s buf')
Just x -> D.Skip (Yielding buf' (Splitting s x))
D.Skip s -> return $ D.Skip (Buffering s buf)
D.Stop -> return $
if byteLength buf == 0
then D.Stop
else D.Skip (Yielding buf Finishing)
step' _ (Splitting st buf) = do
(arr1, marr2) <- breakOn byte buf
return $ case marr2 of
Nothing -> D.Skip $ Buffering st arr1
Just arr2 -> D.Skip $ Yielding arr1 (Splitting st arr2)
step' _ (Yielding arr next) = return $ D.Yield arr next
step' _ Finishing = return $ D.Stop

View File

@ -0,0 +1,185 @@
-- Copyright : (c) 2020 Composewell Technologies
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
import Control.Monad (when)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Primitive.Types (Prim(..))
import Streamly.Internal.Data.Fold.Types (Fold(..))
import Streamly.Internal.Data.Unfold.Types (Unfold(..))
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Stream.StreamK.Type (IsStream)
import qualified Streamly.Internal.Data.Stream.Prelude as P
import qualified Streamly.Internal.Data.Stream.Serial as Serial
import qualified Streamly.Internal.Data.Stream.StreamD as D
import qualified Streamly.Internal.Data.Stream.StreamK as K
import Prelude hiding (length, null, last, map, (!!), read, concat)
-------------------------------------------------------------------------------
-- Construction
-------------------------------------------------------------------------------
-- | Create an 'Array' from the first N elements of a stream. The array is
-- allocated to size N, if the stream terminates before N elements then the
-- array may hold less than N elements.
--
-- /Internal/
{-# INLINE fromStreamN #-}
fromStreamN :: (MonadIO m, Prim a) => Int -> SerialT m a -> m (Array a)
fromStreamN n m = do
when (n < 0) $ error "writeN: negative write count specified"
A.fromStreamDN n $ D.toStreamD m
-- | Create an 'Array' from a stream. This is useful when we want to create a
-- single array from a stream of unknown size. 'writeN' is at least twice
-- as efficient when the size is already known.
--
-- Note that if the input stream is too large memory allocation for the array
-- may fail. When the stream size is not known, `arraysOf` followed by
-- processing of indvidual arrays in the resulting stream should be preferred.
--
-- /Internal/
{-# INLINE fromStream #-}
fromStream :: (MonadIO m, Prim a) => SerialT m a -> m (Array a)
fromStream = P.runFold A.write
-- write m = A.fromStreamD $ D.toStreamD m
-------------------------------------------------------------------------------
-- Elimination
-------------------------------------------------------------------------------
-- | Convert an 'Array' into a stream.
--
-- /Internal/
{-# INLINE_EARLY toStream #-}
toStream :: (MonadIO m, K.IsStream t, Prim a) => Array a -> t m a
toStream = D.fromStreamD . A.toStreamD
-- XXX add fallback to StreamK rule
-- {-# RULES "Streamly.Array.read fallback to StreamK" [1]
-- forall a. S.readK (read a) = K.fromArray a #-}
-- | Convert an 'Array' into a stream in reverse order.
--
-- /Internal/
{-# INLINE_EARLY toStreamRev #-}
toStreamRev :: (MonadIO m, IsStream t, Prim a) => Array a -> t m a
toStreamRev = D.fromStreamD . A.toStreamDRev
-- XXX add fallback to StreamK rule
-- {-# RULES "Streamly.Array.readRev fallback to StreamK" [1]
-- forall a. S.toStreamK (readRev a) = K.revFromArray a #-}
-- | Unfold an array into a stream.
--
-- @since 0.7.0
{-# INLINE_NORMAL read #-}
read :: (MonadIO m, Prim a) => Unfold m (Array a) a
read = Unfold step inject
where
inject = return
{-# INLINE_LATE step #-}
step (Array _ _ len) | len == 0 = return D.Stop
step arr@(Array arr# off len) =
let !x = A.unsafeIndex arr 0
in return $ D.Yield x (Array arr# (off + 1) (len - 1))
-- | Unfold an array into a stream, does not check the end of the array, the
-- user is responsible for terminating the stream within the array bounds. For
-- high performance application where the end condition can be determined by
-- a terminating fold.
--
-- The following might not be true, not that the representation changed.
-- Written in the hope that it may be faster than "read", however, in the case
-- for which this was written, "read" proves to be faster even though the core
-- generated with unsafeRead looks simpler.
--
-- /Internal/
--
{-# INLINE_NORMAL unsafeRead #-}
unsafeRead :: (MonadIO m, Prim a) => Unfold m (Array a) a
unsafeRead = Unfold step inject
where
inject = return
{-# INLINE_LATE step #-}
step arr@(Array arr# off len) =
let !x = A.unsafeIndex arr 0
in return $ D.Yield x (Array arr# (off + 1) (len - 1))
-- | > null arr = length arr == 0
--
-- /Internal/
{-# INLINE null #-}
null :: Array a -> Bool
null arr = length arr == 0
-------------------------------------------------------------------------------
-- Folds
-------------------------------------------------------------------------------
-- | Fold an array using a 'Fold'.
--
-- /Internal/
{-# INLINE fold #-}
fold :: forall m a b. (MonadIO m, Prim a) => Fold m a b -> Array a -> m b
fold f arr = P.runFold f (toStream arr :: Serial.SerialT m a)
-- | Fold an array using a stream fold operation.
--
-- /Internal/
{-# INLINE streamFold #-}
streamFold :: (MonadIO m, Prim a) => (SerialT m a -> m b) -> Array a -> m b
streamFold f arr = f (toStream arr)
-------------------------------------------------------------------------------
-- Random reads
-------------------------------------------------------------------------------
-- | /O(1)/ Lookup the element at the given index, starting from 0.
--
-- /Internal/
{-# INLINE readIndex #-}
readIndex :: Prim a => Array a -> Int -> Maybe a
readIndex arr i =
if i < 0 || i > length arr - 1
then Nothing
else Just $ A.unsafeIndex arr i
-- | > last arr = readIndex arr (length arr - 1)
--
-- /Internal/
{-# INLINE last #-}
last :: Prim a => Array a -> Maybe a
last arr = readIndex arr (length arr - 1)
-------------------------------------------------------------------------------
-- Array stream operations
-------------------------------------------------------------------------------
-- | Convert a stream of arrays into a stream of their elements.
--
-- Same as the following but more efficient:
--
-- > concat = S.concatMap A.read
--
-- /Internal/
{-# INLINE concat #-}
concat :: (IsStream t, MonadIO m, Prim a) => t m (Array a) -> t m a
-- concat m = D.fromStreamD $ A.flattenArrays (D.toStreamD m)
-- concat m = D.fromStreamD $ D.concatMap A.toStreamD (D.toStreamD m)
concat m = D.fromStreamD $ D.concatMapU read (D.toStreamD m)
-- | Coalesce adjacent arrays in incoming stream to form bigger arrays of a
-- maximum specified size in bytes.
--
-- /Internal/
{-# INLINE compact #-}
compact ::
(MonadIO m, Prim a) => Int -> SerialT m (Array a) -> SerialT m (Array a)
compact n xs = D.fromStreamD $ A.packArraysChunksOf n (D.toStreamD xs)

View File

@ -1,201 +0,0 @@
{-# OPTIONS_GHC -fno-warn-orphans #-}
#include "inline.hs"
-- |
-- Module : Streamly.Internal.Data.Prim.Array
-- Copyright : (c) 2019 Composewell Technologies
--
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
--
module Streamly.Internal.Data.Prim.Array
(
-- XXX should it be just Array instead? We should be able to replace one
-- array type with another easily.
PrimArray(..)
-- XXX Prim should be exported from Data.Prim module?
, Prim(..)
, foldl'
, foldr
, length
, writeN
, write
, toStreamD
, toStreamDRev
, toStream
, toStreamRev
, read
, readSlice
, fromListN
, fromList
, fromStreamDN
, fromStreamD
, fromStreamN
, fromStream
, streamFold
, fold
)
where
import Prelude hiding (foldr, length, read)
import Control.DeepSeq (NFData(..))
import Control.Monad (when)
import Control.Monad.IO.Class (liftIO, MonadIO)
import GHC.IO (unsafePerformIO)
import Data.Primitive.Types (Prim(..))
import Streamly.Internal.Data.Prim.Array.Types
import Streamly.Internal.Data.Unfold.Types (Unfold(..))
import Streamly.Internal.Data.Fold.Types (Fold(..))
import Streamly.Internal.Data.Stream.StreamK.Type (IsStream)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import qualified Streamly.Internal.Data.Stream.StreamD as D
{-# INLINE_NORMAL toStreamD #-}
toStreamD :: (Prim a, Monad m) => PrimArray a -> D.Stream m a
toStreamD arr = D.Stream step 0
where
{-# INLINE_LATE step #-}
step _ i
| i == sizeofPrimArray arr = return D.Stop
step _ i = return $ D.Yield (indexPrimArray arr i) (i + 1)
{-# INLINE length #-}
length :: Prim a => PrimArray a -> Int
length = sizeofPrimArray
{-# INLINE_NORMAL toStreamDRev #-}
toStreamDRev :: (Prim a, Monad m) => PrimArray a -> D.Stream m a
toStreamDRev arr = D.Stream step (sizeofPrimArray arr - 1)
where
{-# INLINE_LATE step #-}
step _ i
| i < 0 = return D.Stop
step _ i = return $ D.Yield (indexPrimArray arr i) (i - 1)
{-# INLINE_NORMAL foldl' #-}
foldl' :: Prim a => (b -> a -> b) -> b -> PrimArray a -> b
foldl' = foldlPrimArray'
{-# INLINE_NORMAL foldr #-}
foldr :: Prim a => (a -> b -> b) -> b -> PrimArray a -> b
foldr = foldrPrimArray
-- writeN n = S.evertM (fromStreamDN n)
{-# INLINE_NORMAL writeN #-}
writeN :: (MonadIO m, Prim a) => Int -> Fold m a (PrimArray a)
writeN limit = Fold step initial extract
where
initial = do
marr <- liftIO $ newPrimArray limit
return (marr, 0)
step (marr, i) x
| i == limit = return (marr, i)
| otherwise = do
liftIO $ writePrimArray marr i x
return (marr, i + 1)
extract (marr, _) = liftIO $ unsafeFreezePrimArray marr
{-# INLINE_NORMAL write #-}
write :: (MonadIO m, Prim a) => Fold m a (PrimArray a)
write = Fold step initial extract
where
initial = do
marr <- liftIO $ newPrimArray 0
return (marr, 0, 0)
step (marr, i, capacity) x
| i == capacity =
let newCapacity = max (capacity * 2) 1
in do newMarr <- liftIO $ resizeMutablePrimArray marr newCapacity
liftIO $ writePrimArray newMarr i x
return (newMarr, i + 1, newCapacity)
| otherwise = do
liftIO $ writePrimArray marr i x
return (marr, i + 1, capacity)
extract (marr, len, _) = do liftIO $ shrinkMutablePrimArray marr len
liftIO $ unsafeFreezePrimArray marr
{-# INLINE_NORMAL fromStreamDN #-}
fromStreamDN :: (MonadIO m, Prim a) => Int -> D.Stream m a -> m (PrimArray a)
fromStreamDN limit str = do
marr <- liftIO $ newPrimArray (max limit 0)
_ <-
D.foldlM'
(\i x -> i `seq` liftIO (writePrimArray marr i x) >> return (i + 1))
(return 0) $
D.take limit str
liftIO $ unsafeFreezePrimArray marr
{-# INLINE fromStreamD #-}
fromStreamD :: (MonadIO m, Prim a) => D.Stream m a -> m (PrimArray a)
fromStreamD = D.runFold write
{-# INLINABLE fromListN #-}
fromListN :: Prim a => Int -> [a] -> PrimArray a
fromListN n xs = unsafePerformIO $ fromStreamDN n $ D.fromList xs
{-# INLINABLE fromList #-}
fromList :: Prim a => [a] -> PrimArray a
fromList xs = unsafePerformIO $ fromStreamD $ D.fromList xs
instance Prim a => NFData (PrimArray a) where
{-# INLINE rnf #-}
rnf = foldl' (\_ _ -> ()) ()
{-# INLINE fromStreamN #-}
fromStreamN :: (MonadIO m, Prim a) => Int -> SerialT m a -> m (PrimArray a)
fromStreamN n m = do
when (n < 0) $ error "fromStreamN: negative write count specified"
fromStreamDN n $ D.toStreamD m
{-# INLINE fromStream #-}
fromStream :: (MonadIO m, Prim a) => SerialT m a -> m (PrimArray a)
fromStream m = fromStreamD $ D.toStreamD m
{-# INLINE_EARLY toStream #-}
toStream :: (Prim a, Monad m, IsStream t) => PrimArray a -> t m a
toStream = D.fromStreamD . toStreamD
{-# INLINE_EARLY toStreamRev #-}
toStreamRev :: (Prim a, Monad m, IsStream t) => PrimArray a -> t m a
toStreamRev = D.fromStreamD . toStreamDRev
{-# INLINE fold #-}
fold :: (Prim a, Monad m) => Fold m a b -> PrimArray a -> m b
fold f arr = D.runFold f (toStreamD arr)
{-# INLINE streamFold #-}
streamFold :: (Prim a, Monad m) => (SerialT m a -> m b) -> PrimArray a -> m b
streamFold f arr = f (toStream arr)
{-# INLINE_NORMAL read #-}
read :: (Prim a, Monad m) => Unfold m (PrimArray a) a
read = Unfold step inject
where
inject arr = return (arr, 0)
step (arr, i)
| i == length arr = return D.Stop
step (arr, i) = return $ D.Yield (indexPrimArray arr i) (arr, i + 1)
{-# INLINE_NORMAL readSlice #-}
readSlice :: (Prim a, Monad m) => Int -> Int -> Unfold m (PrimArray a) a
readSlice off len = Unfold step inject
where
inject arr = return (arr, off)
step (arr, i)
| i == min (off + len) (length arr) = return D.Stop
step (arr, i) = return $ D.Yield (indexPrimArray arr i) (arr, i + 1)

View File

@ -1,207 +0,0 @@
{-# LANGUAGE UnboxedTuples #-}
-- |
-- Module : Streamly.Internal.Data.Prim.Array.Types
-- Copyright : (c) Roman Leshchinskiy 2009-2012
-- License : BSD-style
--
-- Maintainer : streamly@composewell.com
-- Portability : non-portable
--
-- Arrays of unboxed primitive types. The function provided by this module
-- match the behavior of those provided by @Data.Primitive.ByteArray@, and
-- the underlying types and primops that back them are the same.
-- However, the type constructors 'PrimArray' and 'MutablePrimArray' take one additional
-- argument than their respective counterparts 'ByteArray' and 'MutableByteArray'.
-- This argument is used to designate the type of element in the array.
-- Consequently, all function this modules accepts length and incides in
-- terms of elements, not bytes.
--
-- @since 0.6.4.0
module Streamly.Internal.Data.Prim.Array.Types
( -- * Types
PrimArray(..)
, MutablePrimArray(..)
-- * Allocation
, newPrimArray
, resizeMutablePrimArray
, shrinkMutablePrimArray
-- * Element Access
, writePrimArray
, indexPrimArray
-- * Freezing and Thawing
, unsafeFreezePrimArray
-- * Information
, sizeofPrimArray
-- * Folding
, foldrPrimArray
, foldlPrimArray'
) where
import GHC.Exts
import Data.Primitive.Types
import Data.Primitive.ByteArray (ByteArray(..))
import Control.Monad.Primitive
import qualified Data.Primitive.ByteArray as PB
-- | Arrays of unboxed elements. This accepts types like 'Double', 'Char',
-- 'Int', and 'Word', as well as their fixed-length variants ('Word8',
-- 'Word16', etc.). Since the elements are unboxed, a 'PrimArray' is strict
-- in its elements. This differs from the behavior of 'Array', which is lazy
-- in its elements.
data PrimArray a = PrimArray ByteArray#
-- | Mutable primitive arrays associated with a primitive state token.
-- These can be written to and read from in a monadic context that supports
-- sequencing such as 'IO' or 'ST'. Typically, a mutable primitive array will
-- be built and then convert to an immutable primitive array using
-- 'unsafeFreezePrimArray'. However, it is also acceptable to simply discard
-- a mutable primitive array since it lives in managed memory and will be
-- garbage collected when no longer referenced.
data MutablePrimArray s a = MutablePrimArray (MutableByteArray# s)
sameByteArray :: ByteArray# -> ByteArray# -> Bool
sameByteArray ba1 ba2 =
case reallyUnsafePtrEquality# (unsafeCoerce# ba1 :: ()) (unsafeCoerce# ba2 :: ()) of
r -> isTrue# r
-- | @since 0.6.4.0
instance (Eq a, Prim a) => Eq (PrimArray a) where
a1@(PrimArray ba1#) == a2@(PrimArray ba2#)
| sameByteArray ba1# ba2# = True
| sz1 /= sz2 = False
| otherwise = loop (quot sz1 (sizeOf (undefined :: a)) - 1)
where
-- Here, we take the size in bytes, not in elements. We do this
-- since it allows us to defer performing the division to
-- calculate the size in elements.
sz1 = PB.sizeofByteArray (ByteArray ba1#)
sz2 = PB.sizeofByteArray (ByteArray ba2#)
loop !i
| i < 0 = True
| otherwise = indexPrimArray a1 i == indexPrimArray a2 i && loop (i-1)
{-# INLINE (==) #-}
-- | Lexicographic ordering. Subject to change between major versions.
--
-- @since 0.6.4.0
instance (Ord a, Prim a) => Ord (PrimArray a) where
compare a1@(PrimArray ba1#) a2@(PrimArray ba2#)
| sameByteArray ba1# ba2# = EQ
| otherwise = loop 0
where
cmp LT _ = LT
cmp EQ y = y
cmp GT _ = GT
sz1 = PB.sizeofByteArray (ByteArray ba1#)
sz2 = PB.sizeofByteArray (ByteArray ba2#)
sz = quot (min sz1 sz2) (sizeOf (undefined :: a))
loop !i
| i < sz = compare (indexPrimArray a1 i) (indexPrimArray a2 i) `cmp` loop (i+1)
| otherwise = compare sz1 sz2
{-# INLINE compare #-}
-- | @since 0.6.4.0
instance (Show a, Prim a) => Show (PrimArray a) where
showsPrec p a = showParen (p > 10) $
showString "fromListN " . shows (sizeofPrimArray a) . showString " "
. shows (primArrayToList a)
-- | Convert the primitive array to a list.
{-# INLINE primArrayToList #-}
primArrayToList :: forall a. Prim a => PrimArray a -> [a]
primArrayToList xs = build (\c n -> foldrPrimArray c n xs)
-- | Create a new mutable primitive array of the given length. The
-- underlying memory is left uninitialized.
newPrimArray :: forall m a. (PrimMonad m, Prim a) => Int -> m (MutablePrimArray (PrimState m) a)
{-# INLINE newPrimArray #-}
newPrimArray (I# n#)
= primitive (\s# ->
case newByteArray# (n# *# sizeOf# (undefined :: a)) s# of
(# s'#, arr# #) -> (# s'#, MutablePrimArray arr# #)
)
-- | Resize a mutable primitive array. The new size is given in elements.
--
-- This will either resize the array in-place or, if not possible, allocate the
-- contents into a new, unpinned array and copy the original array\'s contents.
--
-- To avoid undefined behaviour, the original 'MutablePrimArray' shall not be
-- accessed anymore after a 'resizeMutablePrimArray' has been performed.
-- Moreover, no reference to the old one should be kept in order to allow
-- garbage collection of the original 'MutablePrimArray' in case a new
-- 'MutablePrimArray' had to be allocated.
resizeMutablePrimArray :: forall m a. (PrimMonad m, Prim a)
=> MutablePrimArray (PrimState m) a
-> Int -- ^ new size
-> m (MutablePrimArray (PrimState m) a)
{-# INLINE resizeMutablePrimArray #-}
resizeMutablePrimArray (MutablePrimArray arr#) (I# n#)
= primitive (\s# -> case resizeMutableByteArray# arr# (n# *# sizeOf# (undefined :: a)) s# of
(# s'#, arr'# #) -> (# s'#, MutablePrimArray arr'# #))
-- Although it is possible to shim resizeMutableByteArray for old GHCs, this
-- is not the case with shrinkMutablePrimArray.
-- | Shrink a mutable primitive array. The new size is given in elements.
-- It must be smaller than the old size. The array will be resized in place.
-- This function is only available when compiling with GHC 7.10 or newer.
shrinkMutablePrimArray :: forall m a. (PrimMonad m, Prim a)
=> MutablePrimArray (PrimState m) a
-> Int -- ^ new size
-> m ()
{-# INLINE shrinkMutablePrimArray #-}
shrinkMutablePrimArray (MutablePrimArray arr#) (I# n#)
= primitive_ (shrinkMutableByteArray# arr# (n# *# sizeOf# (undefined :: a)))
-- | Write an element to the given index.
writePrimArray ::
(Prim a, PrimMonad m)
=> MutablePrimArray (PrimState m) a -- ^ array
-> Int -- ^ index
-> a -- ^ element
-> m ()
{-# INLINE writePrimArray #-}
writePrimArray (MutablePrimArray arr#) (I# i#) x
= primitive_ (writeByteArray# arr# i# x)
-- | Convert a mutable byte array to an immutable one without copying. The
-- array should not be modified after the conversion.
unsafeFreezePrimArray
:: PrimMonad m => MutablePrimArray (PrimState m) a -> m (PrimArray a)
{-# INLINE unsafeFreezePrimArray #-}
unsafeFreezePrimArray (MutablePrimArray arr#)
= primitive (\s# -> case unsafeFreezeByteArray# arr# s# of
(# s'#, arr'# #) -> (# s'#, PrimArray arr'# #))
-- | Read a primitive value from the primitive array.
indexPrimArray :: forall a. Prim a => PrimArray a -> Int -> a
{-# INLINE indexPrimArray #-}
indexPrimArray (PrimArray arr#) (I# i#) = indexByteArray# arr# i#
-- | Get the size, in elements, of the primitive array.
sizeofPrimArray :: forall a. Prim a => PrimArray a -> Int
{-# INLINE sizeofPrimArray #-}
sizeofPrimArray (PrimArray arr#) = I# (quotInt# (sizeofByteArray# arr#) (sizeOf# (undefined :: a)))
-- | Lazy right-associated fold over the elements of a 'PrimArray'.
{-# INLINE foldrPrimArray #-}
foldrPrimArray :: forall a b. Prim a => (a -> b -> b) -> b -> PrimArray a -> b
foldrPrimArray f z arr = go 0
where
!sz = sizeofPrimArray arr
go !i
| sz > i = f (indexPrimArray arr i) (go (i+1))
| otherwise = z
-- | Strict left-associated fold over the elements of a 'PrimArray'.
{-# INLINE foldlPrimArray' #-}
foldlPrimArray' :: forall a b. Prim a => (b -> a -> b) -> b -> PrimArray a -> b
foldlPrimArray' f z0 arr = go 0 z0
where
!sz = sizeofPrimArray arr
go !i !acc
| i < sz = go (i + 1) (f acc (indexPrimArray arr i))
| otherwise = acc

View File

@ -0,0 +1,84 @@
-- |
-- Module : Streamly.Internal.Unicode.Array.Prim.Pinned
-- Copyright : (c) 2020 Composewell Technologies
--
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
--
module Streamly.Internal.Unicode.Array.Prim.Pinned
(
-- * Streams of Strings
lines
, words
, unlines
, unwords
)
where
import Control.Monad.IO.Class (MonadIO(..))
import Streamly (IsStream, MonadAsync)
import Prelude hiding (String, lines, words, unlines, unwords)
import Streamly.Internal.Data.Array.Prim.Pinned (Array)
import qualified Streamly.Internal.Data.Unicode.Stream as S
import qualified Streamly.Internal.Data.Array.Prim.Pinned as A
-- | Break a string up into a stream of strings at newline characters.
-- The resulting strings do not contain newlines.
--
-- > lines = S.lines A.write
--
-- >>> S.toList $ lines $ S.fromList "lines\nthis\nstring\n\n\n"
-- ["lines","this","string","",""]
--
{-# INLINE lines #-}
lines :: (MonadIO m, IsStream t) => t m Char -> t m (Array Char)
lines = S.lines A.write
-- | Break a string up into a stream of strings, which were delimited
-- by characters representing white space.
--
-- > words = S.words A.write
--
-- >>> S.toList $ words $ S.fromList "A newline\nis considered white space?"
-- ["A", "newline", "is", "considered", "white", "space?"]
--
{-# INLINE words #-}
words :: (MonadIO m, IsStream t) => t m Char -> t m (Array Char)
words = S.words A.write
-- | Flattens the stream of @Array Char@, after appending a terminating
-- newline to each string.
--
-- 'unlines' is an inverse operation to 'lines'.
--
-- >>> S.toList $ unlines $ S.fromList ["lines", "this", "string"]
-- "lines\nthis\nstring\n"
--
-- > unlines = S.unlines A.read
--
-- Note that, in general
--
-- > unlines . lines /= id
{-# INLINE unlines #-}
unlines :: (MonadAsync m, IsStream t) => t m (Array Char) -> t m Char
unlines = S.unlines A.read
-- | Flattens the stream of @Array Char@, after appending a separating
-- space to each string.
--
-- 'unwords' is an inverse operation to 'words'.
--
-- >>> S.toList $ unwords $ S.fromList ["unwords", "this", "string"]
-- "unwords this string"
--
-- > unwords = S.unwords A.read
--
-- Note that, in general
--
-- > unwords . words /= id
{-# INLINE unwords #-}
unwords :: (MonadAsync m, IsStream t) => t m (Array Char) -> t m Char
unwords = S.unwords A.read

View File

@ -143,6 +143,9 @@ extra-source-files:
examples/README.md
src/Streamly/Internal/Data/Stream/Instances.hs
src/Streamly/Internal/Data/Time/config-clock.h
src/Streamly/Internal/Data/Array/PrimInclude.hs
src/Streamly/Internal/Data/Array/Prim/TypesInclude.hs
src/Streamly/Internal/Data/Array/Prim/MutTypesInclude.hs
src/Streamly/Internal/Data/Time/config.h.in
src/inline.hs
stack.yaml
@ -378,8 +381,12 @@ library
-- Arrays
, Streamly.Internal.Data.Array
, Streamly.Internal.Data.Prim.Array.Types
, Streamly.Internal.Data.Prim.Array
, Streamly.Internal.Data.Array.Prim.Types
, Streamly.Internal.Data.Array.Prim
, Streamly.Internal.Data.Array.Prim.Mut.Types
, Streamly.Internal.Data.Array.Prim.Pinned.Types
, Streamly.Internal.Data.Array.Prim.Pinned
, Streamly.Internal.Data.Array.Prim.Pinned.Mut.Types
, Streamly.Internal.Data.SmallArray.Types
, Streamly.Internal.Data.SmallArray
, Streamly.Internal.Memory.Mutable.Array.Types
@ -439,6 +446,7 @@ library
, Streamly.Internal.Data.Unicode.Stream
, Streamly.Internal.Data.Unicode.Char
, Streamly.Internal.Memory.Unicode.Array
, Streamly.Internal.Unicode.Array.Prim.Pinned
if !impl(ghcjs)
exposed-modules:
@ -660,7 +668,24 @@ test-suite primarray-test
main-is: Streamly/Test/Array.hs
js-sources: jsbits/clock.js
hs-source-dirs: test
cpp-options: -DTEST_PRIM_ARRAY
cpp-options: -DDATA_ARRAY_PRIM
build-depends:
streamly
, base >= 4.8 && < 5
, QuickCheck >= 2.10 && < 2.15
, hspec >= 2.0 && < 3
if impl(ghc < 8.0)
build-depends:
transformers >= 0.4 && < 0.6
default-language: Haskell2010
test-suite prim-pinned-array-test
import: test-options
type: exitcode-stdio-1.0
main-is: Streamly/Test/Array.hs
js-sources: jsbits/clock.js
hs-source-dirs: test
cpp-options: -DDATA_ARRAY_PRIM_PINNED
build-depends:
streamly
, base >= 4.8 && < 5

View File

@ -25,13 +25,20 @@ import qualified Streamly.Prelude as S
import qualified Streamly.Internal.Data.SmallArray as A
type Array = A.SmallArray
#elif defined(TEST_ARRAY)
import qualified Streamly.Memory.Array as A
import qualified Streamly.Internal.Memory.Array as A
import qualified Streamly.Internal.Memory.Array.Types as A
import qualified Streamly.Internal.Prelude as IP
type Array = A.Array
#elif defined(DATA_ARRAY_PRIM_PINNED)
import qualified Streamly.Internal.Data.Array.Prim.Pinned as A
import qualified Streamly.Internal.Data.Array.Prim.Pinned.Types as A
import qualified Streamly.Internal.Prelude as IP
type Array = A.Array
#elif defined(DATA_ARRAY_PRIM)
import qualified Streamly.Internal.Data.Array.Prim as A
import qualified Streamly.Internal.Data.Array.Prim.Types as A
import qualified Streamly.Internal.Prelude as IP
type Array = A.Array
#elif defined(TEST_PRIM_ARRAY)
import qualified Streamly.Internal.Data.Prim.Array as A
type Array = A.PrimArray
#else
import qualified Streamly.Internal.Data.Array as A
type Array = A.Array
@ -122,17 +129,27 @@ testFoldUnfold :: Property
testFoldUnfold = genericTestFromTo (const (S.fold A.write)) (S.unfold A.read) (==)
#endif
#ifdef TEST_ARRAY
#if defined(TEST_ARRAY) ||\
defined(DATA_ARRAY_PRIM) ||\
defined(DATA_ARRAY_PRIM_PINNED)
testArraysOf :: Property
testArraysOf =
forAll (choose (0, maxArrLen)) $ \len ->
forAll (vectorOf len (arbitrary :: Gen Int)) $ \list ->
monadicIO $ do
xs <- S.toList
xs <- run
$ S.toList
$ S.concatUnfold A.read
$ IP.arraysOf 240
$ arraysOf 240
$ S.fromList list
assert (xs == list)
where
arraysOf n = IP.chunksOf n (A.writeNUnsafe n)
#endif
#ifdef TEST_ARRAY
lastN :: Int -> [a] -> [a]
lastN n l = drop (length l - n) l
@ -143,7 +160,8 @@ testLastN =
forAll (choose (0, len)) $ \n ->
forAll (vectorOf len (arbitrary :: Gen Int)) $ \list ->
monadicIO $ do
xs <- fmap A.toList
xs <- run
$ fmap A.toList
$ S.fold (A.lastN n)
$ S.fromList list
assert (xs == lastN n list)
@ -174,9 +192,13 @@ main =
prop "toStream . fromStream === id" testFromStreamToStream
prop "read . write === id" testFoldUnfold
#endif
#ifdef TEST_ARRAY
#if defined(TEST_ARRAY) ||\
defined(DATA_ARRAY_PRIM) ||\
defined(DATA_ARRAY_PRIM_PINNED)
prop "arraysOf concats to original" testArraysOf
#endif
#ifdef TEST_ARRAY
describe "Fold" $ do
prop "lastN : 0 <= n <= len" $ testLastN