Remove the primitive dependency from streamly-core

This commit is contained in:
Adithya Kumar 2022-09-16 16:04:35 +05:30
parent 41947be294
commit 99846a8f07
8 changed files with 199 additions and 113 deletions

View File

@ -27,7 +27,6 @@ benchIOSrc name src = benchIO name src id
sourceIntFromToFromList :: MonadIO m => Int -> Int -> m (Stream Int)
sourceIntFromToFromList value n = P.return $ A.fromListN value [n..n + value]
{-# INLINE readInstance #-}
readInstance :: P.String -> Stream Int
readInstance str =
@ -50,7 +49,8 @@ foldableSum = P.sum
{-# INLINE sourceIntFromToFromStream #-}
sourceIntFromToFromStream :: MonadIO m => Int -> Int -> m (Stream Int)
sourceIntFromToFromStream value n = S.fold A.write $ S.enumerateFromTo n (n + value)
sourceIntFromToFromStream value n =
S.fold A.write $ S.enumerateFromTo n (n + value)
-------------------------------------------------------------------------------
-- Bench groups
@ -61,7 +61,7 @@ o_1_space_generation value =
[ bgroup
"generation"
[ benchIOSrc "write . intFromTo" (sourceIntFromToFromStream value)
, let testStr = mkListString value
, let testStr = "fromList " ++ mkListString value
in testStr `deepseq` bench "read" (nf readInstance testStr)
]
]

View File

@ -1,6 +1,5 @@
{-# OPTIONS_GHC -fno-warn-orphans #-}
{-# LANGUAGE UnboxedTuples #-}
#include "inline.hs"
-- |
@ -33,6 +32,7 @@ module Streamly.Internal.Data.Array
, length
, read
, toList
, toStreamD
, toStreamDRev
, toStream
@ -50,40 +50,56 @@ module Streamly.Internal.Data.Array
)
where
#if !MIN_VERSION_primitive(0,7,1)
import Control.DeepSeq (NFData(..))
#endif
import Control.Monad (when)
import Control.Monad (when, replicateM)
import Control.Monad.IO.Class (liftIO, MonadIO)
import Control.Monad.Primitive (PrimMonad)
import Data.Functor.Identity (runIdentity)
import Data.IORef
import GHC.Base (Int(..))
import GHC.Base (MutableArray#, RealWorld)
import GHC.IO (unsafePerformIO)
import Text.Read (readPrec)
import Streamly.Internal.Data.Fold.Type (Fold(..))
import Streamly.Internal.Data.Stream.Type (Stream)
import Streamly.Internal.Data.Tuple.Strict (Tuple'(..), Tuple3'(..))
import Streamly.Internal.Data.Unfold.Type (Unfold(..))
import qualified GHC.Exts as Exts
import qualified Streamly.Internal.Data.Array.Mut.Type as MArray
import qualified Streamly.Internal.Data.Fold.Type as FL
import qualified Streamly.Internal.Data.Ring as RB
import qualified Streamly.Internal.Data.Stream.StreamD as D
import qualified Streamly.Internal.Data.Stream.Type as Stream
import qualified Streamly.Internal.Data.Unfold as Unfold
import qualified Text.ParserCombinators.ReadPrec as ReadPrec
import Data.Primitive.Array hiding (fromList, fromListN)
import Data.IORef
import Prelude hiding (foldr, length, read)
{-# NOINLINE bottomElement #-}
bottomElement :: a
bottomElement = undefined
-------------------------------------------------------------------------------
-- Array Data Type
-------------------------------------------------------------------------------
data Array a =
Array
{ arrContents# :: MutableArray# RealWorld a
-- ^ The internal contents of the array representing the entire array.
, arrStart :: {-# UNPACK #-}!Int
-- ^ The starting index of this slice.
, arrLen :: {-# UNPACK #-}!Int
-- ^ The length of this slice.
}
unsafeFreeze :: MArray.Array a -> Array a
unsafeFreeze (MArray.Array cont# arrS arrL _) = Array cont# arrS arrL
unsafeThaw :: Array a -> MArray.Array a
unsafeThaw (Array cont# arrS arrL) = MArray.Array cont# arrS arrL arrL
{-# NOINLINE nil #-}
nil :: Array a
nil = unsafePerformIO $ do
marr <- liftIO $ newArray 0 bottomElement
liftIO $ freezeArray marr 0 0
marr <- MArray.newArray 0
return $ unsafeFreeze marr
-------------------------------------------------------------------------------
-- Construction - Folds
@ -91,46 +107,26 @@ nil = unsafePerformIO $ do
{-# INLINE_NORMAL writeN #-}
writeN :: MonadIO m => Int -> Fold m a (Array a)
writeN len = Fold step initial extract
where
{-# INLINE next #-}
next marr i = do
let i1 = i + 1
st = Tuple' marr i1
if len > i1
then return $ FL.Partial st
else fmap FL.Done $ extract st
initial = do
marr <- liftIO $ newArray len bottomElement
next marr (-1)
step (Tuple' marr i) x = do
liftIO $ writeArray marr i x
next marr i
extract (Tuple' marr l) = liftIO $ freezeArray marr 0 l
writeN = fmap unsafeFreeze <$> MArray.writeN
{-# INLINE_NORMAL write #-}
write :: MonadIO m => Fold m a (Array a)
write = Fold step initial extract
where
initial = do
marr <- liftIO $ newArray 0 bottomElement
marr <- liftIO $ MArray.newArray 0
return $ FL.Partial (Tuple3' marr 0 0)
step (Tuple3' marr i capacity) x
| i == capacity =
let newCapacity = max (capacity * 2) 1
in do newMarr <- liftIO $ newArray newCapacity bottomElement
liftIO $ copyMutableArray newMarr 0 marr 0 i
liftIO $ writeArray newMarr i x
return $ FL.Partial $ Tuple3' newMarr (i + 1) newCapacity
in do newMarr <- liftIO $ MArray.realloc newCapacity marr
marr1 <- liftIO $ MArray.snocUnsafe newMarr x
return $ FL.Partial $ Tuple3' marr1 (i + 1) newCapacity
| otherwise = do
liftIO $ writeArray marr i x
return $ FL.Partial $ Tuple3' marr (i + 1) capacity
extract (Tuple3' marr len _) = liftIO $ freezeArray marr 0 len
marr1 <- liftIO $ MArray.snocUnsafe marr x
return $ FL.Partial $ Tuple3' marr1 (i + 1) capacity
extract (Tuple3' marr _ _) =
return $ unsafeFreeze marr
-------------------------------------------------------------------------------
-- Construction - from streams
@ -139,13 +135,13 @@ write = Fold step initial extract
{-# INLINE_NORMAL fromStreamDN #-}
fromStreamDN :: MonadIO m => Int -> D.Stream m a -> m (Array a)
fromStreamDN limit str = do
marr <- liftIO $ newArray (max limit 0) bottomElement
marr <- liftIO $ MArray.newArray (max limit 0)
i <-
D.foldlM'
(\i x -> i `seq` liftIO $ writeArray marr i x >> return (i + 1))
(\i x -> i `seq` liftIO $ MArray.putIndexUnsafe marr i x >> return (i + 1))
(return 0) $
D.take limit str
liftIO $ freezeArray marr 0 i
return $ unsafeFreeze $ marr { MArray.arrLen = i }
{-# INLINE fromStreamD #-}
fromStreamD :: MonadIO m => D.Stream m a -> m (Array a)
@ -175,50 +171,38 @@ fromList xs = unsafePerformIO $ fromStreamD $ D.fromList xs
{-# INLINE length #-}
length :: Array a -> Int
length = sizeofArray
length = arrLen
{-# INLINE_NORMAL read #-}
read :: Monad m => Unfold m (Array a) a
read = Unfold step inject
where
inject arr = return (arr, 0)
step (arr, i)
| i == length arr = return D.Stop
step (arr, I# i) =
return $
case Exts.indexArray# (array# arr) i of
(# x #) -> D.Yield x (arr, I# i + 1)
read :: MonadIO m => Unfold m (Array a) a
read = Unfold.lmap unsafeThaw MArray.read
-------------------------------------------------------------------------------
-- Elimination - to streams
-------------------------------------------------------------------------------
{-# INLINE_NORMAL toList #-}
toList :: Array a -> [a]
toList arr = loop 0
where
len = length arr
loop i | i == len = []
loop i = getIndexUnsafe arr i : loop (i + 1)
{-# INLINE_NORMAL toStreamD #-}
toStreamD :: Monad m => Array a -> D.Stream m a
toStreamD arr = D.Stream step 0
where
{-# INLINE_LATE step #-}
step _ i
| i == length arr = return D.Stop
step _ (I# i) =
return $
case Exts.indexArray# (array# arr) i of
(# x #) -> D.Yield x (I# i + 1)
toStreamD :: MonadIO m => Array a -> D.Stream m a
toStreamD = MArray.toStreamD . unsafeThaw
{-# INLINE_NORMAL toStreamDRev #-}
toStreamDRev :: Monad m => Array a -> D.Stream m a
toStreamDRev arr = D.Stream step (length arr - 1)
where
{-# INLINE_LATE step #-}
step _ i
| i < 0 = return D.Stop
step _ (I# i) =
return $
case Exts.indexArray# (array# arr) i of
(# x #) -> D.Yield x (I# i - 1)
toStreamDRev arr@Array{..} =
D.map (getIndexUnsafe arr)
$ D.enumerateFromThenToIntegral (arrLen - 1) (arrLen - 2) 0
{-# INLINE_EARLY toStream #-}
toStream :: Monad m => Array a -> Stream m a
toStream :: MonadIO m => Array a -> Stream m a
toStream = Stream.fromStreamD . toStreamD
{-# INLINE_EARLY toStreamRev #-}
@ -231,24 +215,22 @@ toStreamRev = Stream.fromStreamD . toStreamDRev
{-# INLINE_NORMAL foldl' #-}
foldl' :: (b -> a -> b) -> b -> Array a -> b
foldl' f z arr = runIdentity $ D.foldl' f z $ toStreamD arr
foldl' f z arr = unsafePerformIO $ D.foldl' f z $ toStreamD arr
{-# INLINE_NORMAL foldr #-}
foldr :: (a -> b -> b) -> b -> Array a -> b
foldr f z arr = runIdentity $ D.foldr f z $ toStreamD arr
foldr f z arr = unsafePerformIO $ D.foldr f z $ toStreamD arr
#if !MIN_VERSION_primitive(0,7,1)
instance NFData a => NFData (Array a) where
{-# INLINE rnf #-}
rnf = foldl' (\_ x -> rnf x) ()
#endif
{-# INLINE fold #-}
fold :: Monad m => Fold m a b -> Array a -> m b
fold :: MonadIO m => Fold m a b -> Array a -> m b
fold f arr = D.fold f (toStreamD arr)
{-# INLINE streamFold #-}
streamFold :: Monad m => (Stream m a -> m b) -> Array a -> m b
streamFold :: MonadIO m => (Stream m a -> m b) -> Array a -> m b
streamFold f arr = f (toStream arr)
-------------------------------------------------------------------------------
@ -261,12 +243,13 @@ streamFold f arr = f (toStream arr)
-- @since 0.8.0
{-# INLINE getIndexUnsafe #-}
getIndexUnsafe :: Array a -> Int -> a
getIndexUnsafe = indexArray
getIndexUnsafe arr i =
unsafePerformIO $ MArray.getIndexUnsafe (unsafeThaw arr) i
{-# INLINE writeLastN #-}
writeLastN :: MonadIO m => Int -> Fold m a (Array a)
writeLastN n
| n <= 0 = fmap (const mempty) FL.drain
| n <= 0 = fmap (const nil) FL.drain
| otherwise = Fold step initial done
where
@ -280,15 +263,18 @@ writeLastN n
return $ FL.Partial $ Tuple' rb (rh + 1)
done (Tuple' rb rh) = do
arr' <- liftIO $ newArray (min rh n) (undefined :: a)
arr' <- liftIO $ MArray.newArray (min rh n)
ref <- liftIO $ readIORef $ RB.ringHead rb
if rh < n
then
liftIO $ copyMutableArray arr' 0 (RB.arr rb) 0 ref
MArray.putSliceUnsafe (RB.arr rb) 0 arr' 0 ref
else do
liftIO $ copyMutableArray arr' 0 (RB.arr rb) ref (n - ref)
liftIO $ copyMutableArray arr' (n - ref) (RB.arr rb) 0 ref
liftIO $ unsafeFreezeArray arr'
MArray.putSliceUnsafe (RB.arr rb) ref arr' 0 (n - ref)
MArray.putSliceUnsafe (RB.arr rb) 0 arr' (n - ref) ref
return $ unsafeFreeze arr'
sliceUnsafe :: Int -> Int -> Array a -> Array a
sliceUnsafe offset len (Array cont off1 _) = Array cont (off1 + offset) len
-- XXX This is not efficient as it copies the array. We should support array
-- slicing so that we can just refer to the underlying array memory instead of
@ -306,30 +292,86 @@ strip p arr =
let indexL = getIndexL 0 -- first predicate failing index
in if indexL == 0 && indexR == lastIndex
then arr
else cloneArray arr indexL (indexR - indexL + 1)
else unsafeFreeze $ unsafePerformIO $ do
let newLen = indexR - indexL + 1
arrThawed = unsafeThaw (sliceUnsafe indexL newLen arr)
MArray.clone arrThawed
where
getIndexR idx
| idx < 0 = idx
| otherwise =
if p (indexArray arr idx) then getIndexR (idx - 1) else idx
if p (getIndexUnsafe arr idx) then getIndexR (idx - 1) else idx
getIndexL idx = if p (indexArray arr idx) then getIndexL (idx + 1) else idx
getIndexL idx = if p (getIndexUnsafe arr idx)
then getIndexL (idx + 1)
else idx
-- | Write an input stream of (index, value) pairs to an array. Throws an
-- error if any index is out of bounds.
--
-- /Pre-release/
{-# INLINE putIndices #-}
putIndices :: PrimMonad m
putIndices :: MonadIO m
=> Array a -> Fold m (Int, a) ()
putIndices arr = FL.rmapM (\ _ -> return ()) (FL.foldlM' step initial)
where
initial = unsafeThawArray arr
initial = return $ unsafeThaw arr
step marr (i, x) = do
writeArray marr i x
MArray.putIndexUnsafe marr i x
return marr
-------------------------------------------------------------------------------
-- Instances
-------------------------------------------------------------------------------
instance Eq a => Eq (Array a) where
a1 == a2 = lenA1 == lenA2 && loop (lenA1 - 1)
where
lenA1 = length a1
lenA2 = length a2
loop i
| i < 0 = True
| otherwise =
let v1 = getIndexUnsafe a1 i
v2 = getIndexUnsafe a2 i
in v1 == v2 && loop (i - 1)
instance Ord a => Ord (Array a) where
compare a1 a2 =
case compare lenA1 lenA2 of
LT -> LT
GT -> GT
EQ -> loop 0
where
lenA1 = length a1
lenA2 = length a2
loop i
| i >= lenA1 = EQ
| otherwise =
let v1 = getIndexUnsafe a1 i
v2 = getIndexUnsafe a2 i
in case compare v1 v2 of
LT -> LT
GT -> GT
EQ -> loop (i + 1)
instance Show a => Show (Array a) where
show arr = "fromList " ++ show (toList arr)
instance Read a => Read (Array a) where
readPrec = do
fromListWord <- replicateM 9 ReadPrec.get
if fromListWord == "fromList "
then fromList <$> readPrec
else ReadPrec.pfail

View File

@ -59,6 +59,9 @@ module Streamly.Internal.Data.Array.Mut.Type
-- and therefore have a cons as well as snoc. But that will require two
-- bounds in the array representation.
-- ** Reallocation
, realloc
-- ** Appending elements
, snocWith
, snoc
@ -146,15 +149,17 @@ module Streamly.Internal.Data.Array.Mut.Type
-- , spliceWith
-- , splice
-- , spliceExp
-- , putSlice
, putSliceUnsafe
-- , appendSlice
-- , appendSliceFrom
, clone
)
where
#include "inline.hs"
#include "assert.hs"
import Control.Exception (assert)
import Control.Monad.IO.Class (MonadIO(..))
import GHC.Base
( MutableArray#
@ -568,3 +573,31 @@ producer = Producer step inject extract
{-# INLINE_NORMAL read #-}
read :: MonadIO m => Unfold m (Array a) a
read = Producer.simplify producer
--------------------------------------------------------------------------------
-- Appending arrays
--------------------------------------------------------------------------------
-- | Put a sub range of a source array into a subrange of a destination array.
-- This is not safe as it does not check the bounds.
{-# INLINE putSliceUnsafe #-}
putSliceUnsafe :: MonadIO m => Array a -> Int -> Array a -> Int -> Int -> m ()
putSliceUnsafe src srcStart dst dstStart len = liftIO $ do
assertM(len <= arrLen dst)
assertM(len <= arrLen src)
let !(I# srcStart#) = srcStart + arrStart src
!(I# dstStart#) = dstStart + arrStart dst
!(I# len#) = len
let arrS# = arrContents# src
arrD# = arrContents# dst
IO $ \s# -> (# copyMutableArray#
arrS# srcStart# arrD# dstStart# len# s#
, () #)
{-# INLINE clone #-}
clone :: MonadIO m => Array a -> m (Array a)
clone src = liftIO $ do
let len = arrLen src
dst <- newArray len
putSliceUnsafe src 0 dst 0 len
return dst

View File

@ -13,12 +13,15 @@ module Streamly.Internal.Data.Ring
, unsafeInsertRing
) where
import Control.Monad.Primitive (PrimMonad(PrimState))
import Data.IORef (modifyIORef', newIORef, readIORef, writeIORef, IORef)
import Data.Primitive.Array (newArray, writeArray, MutableArray)
import Streamly.Internal.Data.Array.Mut.Type
( Array(..)
, newArray
, putIndexUnsafe
)
data Ring a = Ring
{ arr :: MutableArray (PrimState IO) a
{ arr :: Array a
, ringHead :: IORef Int -- current index to be over-written
, ringMax :: !Int -- first index beyond allocated memory
}
@ -26,7 +29,7 @@ data Ring a = Ring
{-# INLINE createRing #-}
createRing :: Int -> IO (Ring a)
createRing count = do
arr' <- newArray count (undefined :: a)
arr' <- newArray count
head' <- newIORef 0
return (Ring
{ arr = arr'
@ -37,7 +40,7 @@ createRing count = do
{-# INLINE unsafeInsertRing #-}
unsafeInsertRing :: Ring a -> Int -> a -> IO ()
unsafeInsertRing Ring{..} idx x = do
writeArray arr (mod idx ringMax) x
putIndexUnsafe arr (mod idx ringMax) x
ref <- readIORef ringHead
if (ref+1) < ringMax
then modifyIORef' ringHead ( + 1)

View File

@ -366,7 +366,7 @@ joinOuter :: MonadIO m =>
joinOuter eq s1 s =
Stream.concatM $ do
inputArr <- Array.fromStream s
let len = length inputArr
let len = Array.length inputArr
foundArr <-
Stream.fold
(MA.writeN len)

View File

@ -320,7 +320,6 @@ library
, Streamly.Internal.Data.List
-- streamly-core-data-arrays
-- XXX Depends on primitive
, Streamly.Internal.Data.Array
, Streamly.Internal.Data.Array.Unboxed
, Streamly.Internal.Data.Array.Stream.Mut.Foreign
@ -328,7 +327,6 @@ library
-- Ring Arrays
, Streamly.Internal.Data.Ring.Foreign
-- XXX Depends on primitive
, Streamly.Internal.Data.Ring
-- Only used for benchmarks
@ -368,7 +366,6 @@ library
-- XXX to be removed
, transformers-base >= 0.4 && < 0.5
, primitive >= 0.5.4 && < 0.8
, heaps >= 0.3 && < 0.5
if flag(use-unliftio)
build-depends: unliftio-core >= 0.2 && < 0.3

View File

@ -67,7 +67,8 @@ import Streamly.Internal.Data.Time.Units (NanoSecond64(..), toRelTime64)
import qualified Data.List as List
import qualified Data.Map.Strict as Map
import qualified Streamly.Internal.Data.Array as Array (fromStream, toStream)
import qualified Streamly.Internal.Data.Array as Array
(fromStream, length, toStream)
import qualified Streamly.Data.Array.Unboxed.Mut as MA
import qualified Streamly.Internal.Data.Fold as Fold
(head, last, toStream, toStreamRev)
@ -487,7 +488,7 @@ joinOuter :: MonadIO m =>
joinOuter eq s1 s =
Stream.concatM $ do
inputArr <- Array.fromStream s
let len = length inputArr
let len = Array.length inputArr
foundArr <-
Stream.fold
(MA.writeN len)

View File

@ -37,6 +37,15 @@ testFromList =
testLengthFromStream :: Property
testLengthFromStream = genericTestFrom (const A.fromStream)
testReadShowInstance :: Property
testReadShowInstance =
forAll (choose (0, maxArrLen)) $ \len ->
forAll (vectorOf len (arbitrary :: Gen Int)) $ \list ->
monadicIO $ do
let arr = A.fromList list
assert (A.toList (read (show arr)) == list)
main :: IO ()
main =
hspec $
@ -49,3 +58,4 @@ main =
prop "toStream . fromStream === id" testFromStreamToStream
prop "read . write === id" testFoldUnfold
prop "fromList" testFromList
prop "testReadShowInstance" testReadShowInstance