Split the current Array into Array and Mutable.Array

* Add unsafeFreezeWithShrink
* Cleanup + Reformat code + Respect hlint
This commit is contained in:
adithyaov 2020-02-05 18:50:08 +05:30
parent 27c3aebbce
commit 12c672ef1d
11 changed files with 1701 additions and 1080 deletions

View File

@ -130,7 +130,9 @@ import Prelude hiding (read)
import qualified GHC.IO.FD as FD
import qualified GHC.IO.Device as RawIO
import Streamly.Internal.Memory.Array.Types (Array(..), byteLength, defaultChunkSize)
import Streamly.Internal.Memory.Array.Types (Array(..), byteLength, defaultChunkSize, unsafeFreeze)
import Streamly.Internal.Memory.Mutable.Array.Types (mutableArray)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Stream.StreamK.Type (IsStream, mkStream)
@ -216,14 +218,10 @@ readArrayUpto size (Handle fd) = do
withForeignPtr ptr $ \p -> do
-- n <- hGetBufSome h p size
n <- RawIO.read fd p size
let v = Array
{ aStart = ptr
, aEnd = p `plusPtr` n
, aBound = p `plusPtr` size
}
-- XXX shrink only if the diff is significant
-- A.shrinkToFit v
return v
-- Use unsafeFreezeWithShrink
return $
unsafeFreeze $ mutableArray ptr (p `plusPtr` n) (p `plusPtr` size)
-------------------------------------------------------------------------------
-- Array IO (output)

View File

@ -363,6 +363,7 @@ import Streamly.Internal.Data.Stream.SVar (fromConsumer, pushToFold)
import qualified Streamly.Internal.Data.Pipe.Types as Pipe
import qualified Streamly.Internal.Memory.Array.Types as A
import qualified Streamly.Internal.Memory.Mutable.Array.Types as MA
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Memory.Ring as RB
import qualified Streamly.Internal.Data.Stream.StreamK as K
@ -4516,16 +4517,16 @@ tapAsync f (Stream step1 state1) = Stream step TapInit
lastN :: (Storable a, MonadIO m) => Int -> Fold m a (Array a)
lastN n
| n <= 0 = fmap (const mempty) FL.drain
| otherwise = Fold step initial done
| otherwise = A.unsafeFreeze <$> Fold step initial done
where
step (Tuple3' rb rh i) a = do
rh1 <- liftIO $ RB.unsafeInsert rb rh a
return $ Tuple3' rb rh1 (i + 1)
initial = fmap (\(a, b) -> Tuple3' a b (0 :: Int)) $ liftIO $ RB.new n
done (Tuple3' rb rh i) = do
arr <- liftIO $ A.newArray n
arr <- liftIO $ MA.newArray n
foldFunc i rh snoc' arr rb
snoc' b a = liftIO $ A.unsafeSnoc b a
snoc' b a = liftIO $ MA.unsafeSnoc b a
foldFunc i
| i < n = RB.unsafeFoldRingM
| otherwise = RB.unsafeFoldRingFullM

View File

@ -192,7 +192,7 @@ decode0 table byte =
utf8table =
let !(Ptr addr) = table
end = table `plusPtr` 364
in A.Array (ForeignPtr addr undefined) end end :: A.Array Word8
in A.Array (ForeignPtr addr undefined) end :: A.Array Word8
showByte = "Streamly: decode0: byte: " ++ show byte
showTable = " table: " ++ show utf8table
@ -219,7 +219,7 @@ decode1 table state codep byte =
utf8table =
let !(Ptr addr) = table
end = table `plusPtr` 364
in A.Array (ForeignPtr addr undefined) end end :: A.Array Word8
in A.Array (ForeignPtr addr undefined) end :: A.Array Word8
showByte = "Streamly: decode1: byte: " ++ show byte
showState st cp =
" state: " ++ show st ++
@ -246,7 +246,7 @@ data FreshPoint s a
{-# INLINE_NORMAL decodeUtf8WithD #-}
decodeUtf8WithD :: Monad m => CodingFailureMode -> Stream m Word8 -> Stream m Char
decodeUtf8WithD cfm (Stream step state) =
let A.Array p _ _ = utf8d
let A.Array p _ = utf8d
!ptr = unsafeForeignPtrToPtr p
in Stream (step' ptr) (FreshPointDecodeInit state)
where
@ -337,7 +337,7 @@ resumeDecodeUtf8EitherD
-> Stream m Word8
-> Stream m (Either DecodeError Char)
resumeDecodeUtf8EitherD dst codep (Stream step state) =
let A.Array p _ _ = utf8d
let A.Array p _ = utf8d
!ptr = unsafeForeignPtrToPtr p
stt =
if dst == 0
@ -431,7 +431,7 @@ decodeUtf8ArraysWithD ::
-> Stream m (A.Array Word8)
-> Stream m Char
decodeUtf8ArraysWithD cfm (Stream step state) =
let A.Array p _ _ = utf8d
let A.Array p _ = utf8d
!ptr = unsafeForeignPtrToPtr p
in Stream (step' ptr) (OuterLoop state Nothing)
where

View File

@ -114,8 +114,9 @@ import Streamly.Data.Fold (Fold)
import Streamly.Internal.Data.Fold.Types (Fold2(..))
import Streamly.Internal.Data.Unfold.Types (Unfold(..))
import Streamly.Internal.Memory.Array.Types
(Array(..), writeNUnsafe, defaultChunkSize, shrinkToFit,
lpackArraysChunksOf)
(Array(..), writeNUnsafe, defaultChunkSize, lpackArraysChunksOf
, unsafeFreezeWithShrink)
import Streamly.Internal.Memory.Mutable.Array.Types (mutableArray)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Stream.StreamK.Type (IsStream, mkStream)
-- import Streamly.String (encodeUtf8, decodeUtf8, foldLines)
@ -157,13 +158,10 @@ readArrayUpto size h = do
-- ptr <- mallocPlainForeignPtrAlignedBytes size (alignment (undefined :: Word8))
withForeignPtr ptr $ \p -> do
n <- hGetBufSome h p size
let v = Array
{ aStart = ptr
, aEnd = p `plusPtr` n
, aBound = p `plusPtr` size
}
-- XXX shrink only if the diff is significant
shrinkToFit v
return $
unsafeFreezeWithShrink $
mutableArray ptr (p `plusPtr` n) (p `plusPtr` size)
-------------------------------------------------------------------------------
-- Stream of Arrays IO

View File

@ -154,14 +154,6 @@ import qualified Streamly.Internal.Data.Stream.StreamK as K
-- Construction
-------------------------------------------------------------------------------
{-
-- | Create a new uninitialized array of given length.
--
-- @since 0.7.0
newArray :: (MonadIO m, Storable a) => Int -> m (Array a)
newArray len = undefined
-}
-- | Create an 'Array' from the first N elements of a stream. The array is
-- allocated to size N, if the stream terminates before N elements then the
-- array may hold less than N elements.
@ -223,7 +215,7 @@ read :: forall m a. (Monad m, Storable a) => Unfold m (Array a) a
read = Unfold step inject
where
inject (Array (ForeignPtr start contents) (Ptr end) _) =
inject (Array (ForeignPtr start contents) (Ptr end)) =
return $ ReadUState (ForeignPtr end contents) (Ptr start)
{-# INLINE_LATE step #-}
@ -257,7 +249,7 @@ unsafeRead :: forall m a. (Monad m, Storable a) => Unfold m (Array a) a
unsafeRead = Unfold step inject
where
inject (Array fp _ _) = return fp
inject (Array fp _) = return fp
{-# INLINE_LATE step #-}
step (ForeignPtr p contents) = do
@ -448,7 +440,7 @@ writeIndex arr i a = do
-- @since 0.7.0
{-# INLINE writeSlice #-}
writeSlice :: (IsStream t, Monad m, Storable a)
=> Array a -> Int -> Int -> t m a -> m ()
=> Array a -> Int -> Int -> t m a -> m (Array a)
writeSlice arr i len s = undefined
-- | @writeSliceRev arr i count stream@ writes a stream to the array @arr@
@ -458,7 +450,7 @@ writeSlice arr i len s = undefined
-- @since 0.7.0
{-# INLINE writeSliceRev #-}
writeSliceRev :: (IsStream t, Monad m, Storable a)
=> Array a -> Int -> Int -> t m a -> m ()
=> Array a -> Int -> Int -> t m a -> m (Array a)
writeSliceRev arr i len s = undefined
-}

File diff suppressed because it is too large Load Diff

View File

@ -2,9 +2,9 @@
-- |
-- Module : Streamly.Internal.Memory.ArrayStream
-- Copyright : (c) 2019 Composewell Technologies
-- Copyright : (c) 2020 Composewell Technologies
--
-- License : BSD3
-- License : BSD3-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
@ -47,6 +47,7 @@ import Streamly.Internal.Data.Stream.StreamK.Type (IsStream)
import qualified Streamly.Internal.Memory.Array as A
import qualified Streamly.Internal.Memory.Array.Types as A
import qualified Streamly.Internal.Memory.Mutable.Array.Types as MA
import qualified Streamly.Internal.Prelude as S
import qualified Streamly.Internal.Data.Stream.StreamD as D
import qualified Streamly.Internal.Data.Stream.Prelude as P
@ -155,17 +156,35 @@ arraysOf n str =
-- stream.
{-# INLINE spliceArraysLenUnsafe #-}
spliceArraysLenUnsafe :: (MonadIO m, Storable a)
=> Int -> SerialT m (Array a) -> m (Array a)
=> Int -> SerialT m (MA.Array a) -> m (MA.Array a)
spliceArraysLenUnsafe len buffered = do
arr <- liftIO $ A.newArray len
end <- S.foldlM' writeArr (return $ aEnd arr) buffered
return $ arr {aEnd = end}
arr <- liftIO $ MA.newArray len
end <- S.foldlM' writeArr (return $ MA.aEnd arr) buffered
return $ arr {MA.aEnd = end}
where
writeArr dst Array{..} =
liftIO $ withForeignPtr aStart $ \src -> do
let count = aEnd `minusPtr` src
writeArr dst (MA.Array as ae _) =
liftIO $ withForeignPtr as $ \src -> do
let count = ae `minusPtr` src
A.memcpy (castPtr dst) (castPtr src) count
return $ dst `plusPtr` count
{-# INLINE _spliceArrays #-}
_spliceArrays :: (MonadIO m, Storable a)
=> SerialT m (Array a) -> m (Array a)
_spliceArrays s = do
buffered <- P.foldr S.cons S.nil s
len <- S.sum (S.map length buffered)
arr <- liftIO $ MA.newArray len
end <- S.foldlM' writeArr (return $ MA.aEnd arr) s
return $ A.unsafeFreeze $ arr {MA.aEnd = end}
where
writeArr dst (Array as ae) =
liftIO $ withForeignPtr as $ \src -> do
let count = ae `minusPtr` src
A.memcpy (castPtr dst) (castPtr src) count
return $ dst `plusPtr` count
@ -175,17 +194,17 @@ _spliceArraysBuffered :: (MonadIO m, Storable a)
_spliceArraysBuffered s = do
buffered <- P.foldr S.cons S.nil s
len <- S.sum (S.map length buffered)
spliceArraysLenUnsafe len s
A.unsafeFreeze <$> spliceArraysLenUnsafe len (S.map A.unsafeThaw s)
{-# INLINE spliceArraysRealloced #-}
spliceArraysRealloced :: forall m a. (MonadIO m, Storable a)
=> SerialT m (Array a) -> m (Array a)
spliceArraysRealloced s = do
let idst = liftIO $ A.newArray (A.bytesToElemCount (undefined :: a)
(A.mkChunkSizeKB 4))
let idst = liftIO $ MA.newArray (A.bytesToElemCount (undefined :: a)
(A.mkChunkSizeKB 4))
arr <- S.foldlM' A.spliceWithDoubling idst s
liftIO $ A.shrinkToFit arr
arr <- S.foldlM' MA.spliceWithDoubling idst (S.map A.unsafeThaw s)
liftIO $ A.unsafeFreeze <$> MA.shrinkToFit arr
-- | Given a stream of arrays, splice them all together to generate a single
-- array. The stream must be /finite/.

File diff suppressed because it is too large Load Diff

View File

@ -90,6 +90,7 @@ import qualified Network.Socket as Net
import Streamly (MonadAsync)
import Streamly.Internal.Data.Unfold.Types (Unfold(..))
import Streamly.Internal.Memory.Array.Types (Array(..), lpackArraysChunksOf)
import Streamly.Internal.Memory.Mutable.Array.Types (mutableArray)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Stream.StreamK.Type (IsStream, mkStream)
import Streamly.Data.Fold (Fold)
@ -255,11 +256,9 @@ readArrayUptoWith f size h = do
-- ptr <- mallocPlainForeignPtrAlignedBytes size (alignment (undefined :: Word8))
withForeignPtr ptr $ \p -> do
n <- f h p size
let v = Array
{ aStart = ptr
, aEnd = p `plusPtr` n
, aBound = p `plusPtr` size
}
let v = A.unsafeFreeze
$ mutableArray ptr (p `plusPtr` n) (p `plusPtr` size)
-- XXX shrink only if the diff is significant
-- A.shrinkToFit v
return v
@ -469,6 +468,7 @@ writeChunksWithBufferOf :: (MonadIO m, Storable a)
=> Int -> Socket -> Fold m (Array a) ()
writeChunksWithBufferOf n h = lpackArraysChunksOf n (writeChunks h)
-- | Write a stream of strings to a socket in Latin1 encoding. Output is
-- flushed to the socket for each string.
--

View File

@ -101,7 +101,7 @@ unsafeEqArrayN Ring{..} rh A.Array{..} n =
let !res = A.unsafeInlineIO $ do
let rs = unsafeForeignPtrToPtr ringStart
as = unsafeForeignPtrToPtr aStart
assert (aBound `minusPtr` as >= ringBound `minusPtr` rs) (return ())
assert (aEnd `minusPtr` as >= ringBound `minusPtr` rs) (return ())
let len = ringBound `minusPtr` rh
r1 <- A.memcmp (castPtr rh) (castPtr as) (min len n)
r2 <- if n > len
@ -127,7 +127,7 @@ unsafeEqArray Ring{..} rh A.Array{..} =
let !res = A.unsafeInlineIO $ do
let rs = unsafeForeignPtrToPtr ringStart
let as = unsafeForeignPtrToPtr aStart
assert (aBound `minusPtr` as >= ringBound `minusPtr` rs)
assert (aEnd `minusPtr` as >= ringBound `minusPtr` rs)
(return ())
let len = ringBound `minusPtr` rh
r1 <- A.memcmp (castPtr rh) (castPtr as) len

View File

@ -382,6 +382,7 @@ library
, Streamly.Internal.Data.Prim.Array
, Streamly.Internal.Data.SmallArray.Types
, Streamly.Internal.Data.SmallArray
, Streamly.Internal.Memory.Mutable.Array.Types
, Streamly.Internal.Memory.Array.Types
, Streamly.Internal.Memory.Array
, Streamly.Internal.Memory.ArrayStream