Use unfolds instead of streams for array reading

This commit is contained in:
Harendra Kumar 2019-09-29 20:49:30 +05:30
parent a5b8b07630
commit 58d1d61afd
16 changed files with 101 additions and 89 deletions

View File

@ -30,7 +30,8 @@
add newlines to a stream, `Streamly.Data.String.encodeUtf8` for encoding and
`Streamly.FileSystem.Handle.write` for writing to a file handle.
* Deprecate `scanx`, `foldx`, `foldxM`, `foldr1`
* Remove deprecated APIs `scan`, `foldl`, `foldlM`
* Remove deprecated APIs `foldl`, `foldlM`
* Replace deprecated API `scan` with a new signature, to scan using Fold.
* In `Streamly` module:
* `runStream` has been deprecated, please use `Streamly.Prelude.drain`
@ -87,11 +88,10 @@ example, concat streams concurrently using this.
character streams and other character stream operations.
* _Arrays_: `Streamly.Memory.Array` module provides arrays for efficient
in-memory buffering and efficient interfacing with IO.
* `Streamly.Memory.ArrayStream` module provide combinators to work on streams
of arrays.
* Add the following to `Streamly.Prelude`:
* `runFold`, `runScan` and `runPostscan`
* `unfold`, `fold`, `scan` and `postscan`
* `concatUnfold` to concat a stream after unfolding each element
* `intervalsOf` and `chunksOf`
* `splitOn`, `splitOnSuffix`, `splitWithSuffix`, and `wordsBy`
* `groups`, `groupsBy` and `groupsByRolling`

View File

@ -13,6 +13,7 @@ import System.Random (randomRIO)
import qualified GHC.Exts as GHC
import qualified ArrayOps as Ops
import qualified Streamly.Internal.Memory.Array as IA
import qualified Streamly.Memory.Array as A
import qualified Streamly.Prelude as S
@ -87,8 +88,8 @@ main =
, benchPureSink "min" Ops.ordInstanceMin
, benchPureSink "IsList.toList" GHC.toList
, benchIOSink "foldl'" Ops.pureFoldl'
, benchIOSink "read" (S.drain . A.read)
, benchIOSink "readRev" (S.drain . A.readRev)
, benchIOSink "read" (S.drain . S.unfold A.read)
, benchIOSink "toStreamRev" (S.drain . IA.toStreamRev)
#ifdef DEVBUILD
, benchPureSink "foldable/foldl'" Ops.foldableFoldl'
, benchPureSink "foldable/sum" Ops.foldableSum

View File

@ -280,7 +280,7 @@ onArray
:: MonadIO m => (S.SerialT m Int -> S.SerialT m Int)
-> Stream Int
-> m (Stream Int)
onArray f arr = S.fold (A.writeN value) $ f $ A.read arr
onArray f arr = S.fold (A.writeN value) $ f $ (S.unfold A.read arr)
scanl' n = composeN n $ onArray $ S.scanl' (+) 0
scanl1' n = composeN n $ onArray $ S.scanl1' (+)
@ -511,7 +511,7 @@ readInstance str =
{-# INLINE pureFoldl' #-}
pureFoldl' :: MonadIO m => Stream Int -> m Int
pureFoldl' = S.foldl' (+) 0 . A.read
pureFoldl' = S.foldl' (+) 0 . S.unfold A.read
#ifdef DEVBUILD
{-# INLINE foldableFoldl' #-}

View File

@ -23,7 +23,7 @@ main = do
withFile file AppendMode
(\src -> S.fold (FH.write src)
$ encodeChar8Unchecked
$ S.concatMap A.read
$ S.concatUnfold A.read
$ S.concatMapWith parallel (flip NS.withSocketS recv)
$ NS.connectionsOnAllAddrs 8090)

View File

@ -19,16 +19,17 @@ import Foreign.Storable (Storable(..))
import qualified Streamly.Data.String as Streamly
import qualified Streamly.FileSystem.Handle as FH
import qualified Streamly.Data.Fold as FL
import qualified Streamly.Internal.Data.Fold as Internal
import qualified Streamly.Internal.Data.Fold as IFL
import qualified Streamly.Internal.Data.Unfold as IUF
import qualified Streamly.Memory.Array as A
import qualified Streamly.Prelude as Streamly
import System.Environment (getArgs)
import System.IO (openFile, IOMode(..))
instance (Enum a, Storable a) => Hashable (A.Array a) where
hash arr = runIdentity $ Streamly.fold Internal.rollingHash (A.read arr)
hash arr = runIdentity $ IUF.fold A.read IFL.rollingHash arr
hashWithSalt salt arr = runIdentity $
Streamly.fold (Internal.rollingHashWithSalt salt) (A.read arr)
IUF.fold A.read (IFL.rollingHashWithSalt salt) arr
{-# INLINE toLower #-}
toLower :: Char -> Char

View File

@ -44,6 +44,7 @@ import qualified Streamly.Memory.Array as A
import qualified Streamly.Prelude as S
import qualified Streamly.Data.String as SS
import qualified Streamly.Internal.Prelude as Internal
import qualified Streamly.Internal.Memory.Array as IA
import qualified Streamly.Internal.Memory.ArrayStream as AS
#ifdef INSPECTION
@ -104,7 +105,7 @@ inspect $ 'countWords `hasNoType` ''Step
{-# INLINE sumBytes #-}
sumBytes :: Handle -> IO Word8
sumBytes inh = do
let foldlArr' f z = runIdentity . S.foldl' f z . A.read
let foldlArr' f z = runIdentity . S.foldl' f z . IA.toStream
let s = FH.readArrays inh
S.foldl' (\acc arr -> acc + foldlArr' (+) 0 arr) 0 s

View File

@ -135,7 +135,7 @@ countLinesU inh =
S.length
$ SS.foldLines FL.drain
$ SS.decodeChar8
$ IP.concatMapU AT.readU (FH.readArrays inh)
$ S.concatUnfold A.read (FH.readArrays inh)
#ifdef INSPECTION
inspect $ hasNoTypeClasses 'countLinesU
@ -344,7 +344,7 @@ isSp = isSpace . chr . fromIntegral
wordsUnwordsCopyWord8 :: Handle -> Handle -> IO ()
wordsUnwordsCopyWord8 inh outh =
S.fold (FH.write outh)
$ IP.concatMapU IUF.fromList
$ S.concatUnfold IUF.fromList
$ S.intersperse [32]
$ S.wordsBy isSp FL.toList
$ FH.read inh
@ -361,7 +361,7 @@ wordsUnwordsCopy :: Handle -> Handle -> IO ()
wordsUnwordsCopy inh outh =
S.fold (FH.write outh)
$ SS.encodeChar8
$ IP.concatMapU IUF.fromList
$ S.concatUnfold IUF.fromList
$ S.intersperse " "
-- Array allocation is too expensive for such small strings. So just use
-- lists instead.

View File

@ -720,7 +720,7 @@ inspect $ hasNoTypeClasses 'concatMapRepl4xN
{-# INLINE concatUnfoldRepl4xN #-}
concatUnfoldRepl4xN :: Int -> IO ()
concatUnfoldRepl4xN n =
S.drain $ Internal.concatMapU
S.drain $ S.concatUnfold
(UF.replicateM 4)
(sourceUnfoldrMN (value `div` 4) n)

View File

@ -95,7 +95,6 @@ import Streamly.Internal.Data.Unfold (Unfold)
import qualified Streamly.Internal.Prelude as S
import qualified Streamly.Memory.Array as A
import qualified Streamly.Internal.Memory.Array.Types as A
import qualified Streamly.Internal.Memory.ArrayStream as AS
import qualified Streamly.Streams.StreamD as D
import qualified Streamly.Internal.Data.Unfold as UF
@ -284,14 +283,14 @@ unfoldLines unf = S.intercalateSuffix UF.singleton '\n' unf
-- >>> S.toList $ unlines $ S.fromList ["lines", "this", "string"]
-- "lines\nthis\nstring\n"
--
-- > unlines = unfoldLines A.readU
-- > unlines = unfoldLines A.read
--
-- Note that, in general
--
-- > unlines . lines /= id
{-# INLINE unlines #-}
unlines :: (MonadIO m, IsStream t) => t m (Array Char) -> t m Char
unlines = unfoldLines A.readU
unlines = unfoldLines A.read
-- | Flattens the stream of @Array Char@, after appending a separating
-- space to each string.

View File

@ -64,8 +64,9 @@ module Streamly.Internal.Memory.Array
-- list.
, A.toList
, A.read
, readRev
, toStream
, toStreamRev
, read
-- * Random Access
, length
@ -96,19 +97,25 @@ where
import Control.Monad.IO.Class (MonadIO(..))
-- import Data.Functor.Identity (Identity)
import Foreign.ForeignPtr (withForeignPtr)
import Foreign.ForeignPtr (withForeignPtr, touchForeignPtr)
import Foreign.Ptr (plusPtr)
import Foreign.Storable (Storable(..))
import Prelude hiding (length, null, last, map, (!!), read, concat)
import GHC.ForeignPtr (ForeignPtr(..))
import GHC.Ptr (Ptr(..))
import Streamly.Internal.Data.Fold.Types (Fold(..))
import Streamly.Internal.Data.Unfold.Types (Unfold(..))
import Streamly.Internal.Memory.Array.Types (Array(..), length)
import Streamly.Streams.Serial (SerialT)
import Streamly.Streams.StreamK.Type (IsStream)
import Streamly.Internal.Data.Fold.Types (Fold(..))
import qualified Streamly.Internal.Memory.Array.Types as A
import qualified Streamly.Streams.Prelude as P
import qualified Streamly.Streams.Serial as Serial
import qualified Streamly.Streams.StreamD as D
import qualified Streamly.Streams.StreamK as K
-------------------------------------------------------------------------------
-- Construction
@ -151,16 +158,56 @@ fromStream = P.runFold A.write
-- Elimination
-------------------------------------------------------------------------------
-- | Convert an 'Array' into a stream.
--
-- /Internal/
{-# INLINE_EARLY toStream #-}
toStream :: (Monad m, K.IsStream t, Storable a) => Array a -> t m a
toStream = D.fromStreamD . A.toStreamD
-- XXX add fallback to StreamK rule
-- {-# RULES "Streamly.Array.read fallback to StreamK" [1]
-- forall a. S.readK (read a) = K.fromArray a #-}
-- | Convert an 'Array' into a stream in reverse order.
--
-- @since 0.7.0
{-# INLINE_EARLY readRev #-}
readRev :: (Monad m, IsStream t, Storable a) => Array a -> t m a
readRev = D.fromStreamD . A.toStreamDRev
-- /Internal/
{-# INLINE_EARLY toStreamRev #-}
toStreamRev :: (Monad m, IsStream t, Storable a) => Array a -> t m a
toStreamRev = D.fromStreamD . A.toStreamDRev
-- XXX add fallback to StreamK rule
-- {-# RULES "Streamly.Array.readRev fallback to StreamK" [1]
-- forall a. S.toStreamK (readRev a) = K.revFromArray a #-}
data ReadUState a = ReadUState
{-# UNPACK #-} !(ForeignPtr a) -- foreign ptr with end of array pointer
{-# UNPACK #-} !(Ptr a) -- current pointer
-- | Unfold an array into a stream.
--
-- @since 0.7.0
{-# INLINE_NORMAL read #-}
read :: forall m a. (Monad m, Storable a) => Unfold m (Array a) a
read = Unfold step inject
where
inject (Array (ForeignPtr start contents) (Ptr end) _) =
return $ ReadUState (ForeignPtr end contents) (Ptr start)
{-# INLINE_LATE step #-}
step (ReadUState fp@(ForeignPtr end _) p) | p == (Ptr end) =
let x = A.unsafeInlineIO $ touchForeignPtr fp
in x `seq` return D.Stop
step (ReadUState fp p) = do
-- unsafeInlineIO allows us to run this in Identity monad for pure
-- toList/foldr case which makes them much faster due to not
-- accumulating the list and fusing better with the pure consumers.
--
-- This should be safe as the array contents are guaranteed to be
-- evaluated/written to before we peek at them.
let !x = A.unsafeInlineIO $ peek p
return $ D.Yield x
(ReadUState fp (p `plusPtr` (sizeOf (undefined :: a))))
-- | > null arr = length arr == 0
--
-- /Internal/
@ -375,18 +422,18 @@ streamTransform :: forall m a b. (MonadIO m, Storable a, Storable b)
=> (SerialT m a -> SerialT m b) -> Array a -> m (Array b)
streamTransform f arr =
P.runFold (A.toArrayMinChunk (alignment (undefined :: a)) (length arr))
$ f (A.read arr)
$ f (toStream arr)
-- | Fold an array using a 'Fold'.
--
-- /Internal/
{-# INLINE fold #-}
fold :: forall m a b. (MonadIO m, Storable a) => Fold m a b -> Array a -> m b
fold f arr = P.runFold f $ (A.read arr :: Serial.SerialT m a)
fold f arr = P.runFold f $ (toStream arr :: Serial.SerialT m a)
-- | Fold an array using a stream fold operation.
--
-- /Internal/
{-# INLINE streamFold #-}
streamFold :: (MonadIO m, Storable a) => (SerialT m a -> m b) -> Array a -> m b
streamFold f arr = f (A.read arr)
streamFold f arr = f (toStream arr)

View File

@ -68,8 +68,6 @@ module Streamly.Internal.Memory.Array.Types
, writeNUnsafe
, write
, writeAligned
, read
, readU
-- * Utilities
, defaultChunkSize
@ -111,7 +109,6 @@ import GHC.IO (IO(IO), unsafePerformIO)
import GHC.Ptr (Ptr(..))
import Streamly.Internal.Data.Fold.Types (Fold(..))
import Streamly.Internal.Data.Unfold.Types (Unfold(..))
import Streamly.Internal.Data.Strict (Tuple'(..))
import Streamly.Internal.Data.SVar (adaptState)
@ -407,33 +404,6 @@ byteCapacity Array{..} =
len = aBound `minusPtr` p
in assert (len >= 0) len
data ReadUState a = ReadUState
{-# UNPACK #-} !(ForeignPtr a) -- foreign ptr with end of array pointer
{-# UNPACK #-} !(Ptr a) -- current pointer
{-# INLINE_NORMAL readU #-}
readU :: forall m a. (Monad m, Storable a) => Unfold m (Array a) a
readU = Unfold step inject
where
inject (Array (ForeignPtr start contents) (Ptr end) _) =
return $ ReadUState (ForeignPtr end contents) (Ptr start)
{-# INLINE_LATE step #-}
step (ReadUState fp@(ForeignPtr end _) p) | p == (Ptr end) =
let x = unsafeInlineIO $ touchForeignPtr fp
in x `seq` return D.Stop
step (ReadUState fp p) = do
-- unsafeInlineIO allows us to run this in Identity monad for pure
-- toList/foldr case which makes them much faster due to not
-- accumulating the list and fusing better with the pure consumers.
--
-- This should be safe as the array contents are guaranteed to be
-- evaluated/written to before we peek at them.
let !x = unsafeInlineIO $ peek p
return $ D.Yield x
(ReadUState fp (p `plusPtr` (sizeOf (undefined :: a))))
{-# INLINE_NORMAL toStreamD #-}
toStreamD :: forall m a. (Monad m, Storable a) => Array a -> D.Stream m a
toStreamD Array{..} =
@ -905,16 +875,6 @@ instance Foldable Array where
foldr = _foldr
#endif
-- | Convert an 'Array' into a stream.
--
-- @since 0.7.0
{-# INLINE_EARLY read #-}
read :: (Monad m, K.IsStream t, Storable a) => Array a -> t m a
read = D.fromStreamD . toStreamD
-- XXX add fallback to StreamK rule
-- {-# RULES "Streamly.Array.read fallback to StreamK" [1]
-- forall a. S.readK (read a) = K.fromArray a #-}
-------------------------------------------------------------------------------
-- Semigroup and Monoid
-------------------------------------------------------------------------------

View File

@ -52,6 +52,7 @@ import Streamly.Streams.Serial (SerialT)
import Streamly.Streams.StreamK.Type (IsStream)
import qualified Streamly.Internal.Data.Unfold as UF
import qualified Streamly.Internal.Memory.Array as A
import qualified Streamly.Internal.Memory.Array.Types as A
import qualified Streamly.Internal.Prelude as S
import qualified Streamly.Streams.StreamD as D
@ -72,7 +73,7 @@ import qualified Streamly.Streams.Prelude as P
concat :: (IsStream t, MonadIO m, Storable a) => t m (Array a) -> t m a
-- concat m = D.fromStreamD $ A.flattenArrays (D.toStreamD m)
-- concat m = D.fromStreamD $ D.concatMap A.toStreamD (D.toStreamD m)
concat m = D.fromStreamD $ D.concatMapU A.readU (D.toStreamD m)
concat m = D.fromStreamD $ D.concatMapU A.read (D.toStreamD m)
-- XXX should we have a reverseArrays API to reverse the stream of arrays
-- instead?
@ -88,7 +89,7 @@ concatRev m = D.fromStreamD $ A.flattenArraysRev (D.toStreamD m)
{-# INLINE intercalateSuffix #-}
intercalateSuffix :: (MonadIO m, IsStream t, Storable a)
=> Array a -> t m (Array a) -> t m a
intercalateSuffix arr = S.intercalateSuffix A.readU arr A.readU
intercalateSuffix arr = S.intercalateSuffix A.read arr A.read
-- | Flatten a stream of arrays appending the given element after each
-- array.
@ -98,7 +99,7 @@ intercalateSuffix arr = S.intercalateSuffix A.readU arr A.readU
interposeSuffix :: (MonadIO m, IsStream t, Storable a)
=> a -> t m (Array a) -> t m a
-- interposeSuffix x = D.fromStreamD . A.unlines x . D.toStreamD
interposeSuffix x = S.intercalateSuffix UF.singleton x A.readU
interposeSuffix x = S.intercalateSuffix UF.singleton x A.read
-- | Split a stream of arrays on a given separator byte, dropping the separator
-- and coalescing all the arrays between two separators into a single array.

View File

@ -248,7 +248,7 @@ module Streamly.Internal.Prelude
-- ** Nested Streams
, concatMapM
, concatMapU
, concatUnfold
, concatUnfoldInterleave
, concatUnfoldRoundrobin
, concatMap
@ -426,7 +426,7 @@ import Streamly.Internal.Data.Time.Units
import Streamly.Internal.Data.Strict
import qualified Streamly.Internal.Memory.Array.Types as A
import qualified Streamly.Internal.Memory.Array as A
import qualified Streamly.Data.Fold as FL
import qualified Streamly.Internal.Data.Fold.Types as FL
import qualified Streamly.Streams.Prelude as P
@ -542,7 +542,7 @@ unfoldrMSerial step seed = fromStreamS (S.unfoldrM step seed)
--
-- >>> unfold UF.replicateM 10 (putStrLn "hello")
--
-- /Internal/
-- /Since: 0.7.0/
{-# INLINE unfold #-}
unfold :: (IsStream t, Monad m) => Unfold m a b -> a -> t m b
unfold unf x = fromStreamD $ D.unfold unf x
@ -2210,9 +2210,9 @@ concatMapM f m = fromStreamD $ D.concatMapM (fmap toStreamD . f) (toStreamD m)
-- therefore provide many times better performance.
--
-- @since 0.7.0
{-# INLINE concatMapU #-}
concatMapU ::(IsStream t, Monad m) => Unfold m a b -> t m a -> t m b
concatMapU u m = fromStreamD $ D.concatMapU u (toStreamD m)
{-# INLINE concatUnfold #-}
concatUnfold ::(IsStream t, Monad m) => Unfold m a b -> t m a -> t m b
concatUnfold u m = fromStreamD $ D.concatMapU u (toStreamD m)
-- | Like 'concatUnfold' but interleaves the streams in the same way as
-- 'interleave' behaves instead of appending them.
@ -2988,7 +2988,8 @@ wordsOn subseq f m = undefined -- D.fromStreamD $ D.wordsOn f subseq (D.toStream
splitBySeq
:: (IsStream t, MonadAsync m, Storable a, Enum a, Eq a)
=> Array a -> Fold m a b -> t m a -> t m b
splitBySeq patt f m = intersperseM (fold f (A.read patt)) $ splitOnSeq patt f m
splitBySeq patt f m =
intersperseM (fold f (A.toStream patt)) $ splitOnSeq patt f m
-- | Like 'splitSuffixOn' but keeps the suffix intact in the splits.
--

View File

@ -90,7 +90,6 @@ module Streamly.Memory.Array
, A.toList
, A.read
, readRev
-- ** Random Access
, A.length

View File

@ -92,6 +92,7 @@ module Streamly.Prelude
-- | Generate a monadic stream from a seed value and a generator function.
, unfoldr
, unfoldrM
, unfold
, iterate
, iterateM
, fromIndices
@ -662,6 +663,7 @@ module Streamly.Prelude
--, bindWith
, concatMap
, concatMapM
, concatUnfold
-- * Exceptions
, before

View File

@ -11,8 +11,8 @@ import Test.QuickCheck.Monadic (monadicIO, assert)
import Test.Hspec as H
import qualified Streamly.Internal.Memory.Array as IA
import qualified Streamly.Memory.Array as A
import qualified Streamly.Internal.Memory.Array.Types as AT
import qualified Streamly.Prelude as S
import qualified Streamly.Internal.Prelude as IP
@ -55,7 +55,7 @@ testFromToStreamN =
arr <- S.fold (A.writeN len)
$ S.fromList list
xs <- S.toList
$ A.read arr
$ S.unfold A.read arr
assert (xs == list)
testToStreamRev :: Property
@ -66,7 +66,7 @@ testToStreamRev =
arr <- S.fold (A.writeN len)
$ S.fromList list
xs <- S.toList
$ A.readRev arr
$ IA.toStreamRev arr
assert (xs == reverse list)
testArraysOf :: Property
@ -75,7 +75,7 @@ testArraysOf =
forAll (vectorOf len (arbitrary :: Gen Int)) $ \list ->
monadicIO $ do
xs <- S.toList
$ S.concatMap A.read
$ S.concatUnfold A.read
$ IP.arraysOf 240
$ S.fromList list
assert (xs == list)
@ -86,7 +86,7 @@ testFlattenArrays =
forAll (vectorOf len (arbitrary :: Gen Int)) $ \list ->
monadicIO $ do
xs <- S.toList
$ IP.concatMapU AT.readU
$ S.concatUnfold A.read
$ IP.arraysOf 240
$ S.fromList list
assert (xs == list)
@ -98,7 +98,7 @@ testFromToStream =
monadicIO $ do
arr <- S.fold A.write $ S.fromList list
xs <- S.toList
$ A.read arr
$ S.unfold A.read arr
assert (xs == list)
main :: IO ()
@ -109,7 +109,7 @@ main = hspec
describe "Construction" $ do
prop "length . writeN n === n" $ testLength
prop "read . writeN n === id" $ testFromToStreamN
prop "readRev . write === reverse" $ testToStreamRev
prop "toStreamRev . write === reverse" $ testToStreamRev
prop "arraysOf concats to original" $ testArraysOf
prop "concats to original" $ testFlattenArrays
prop "read . write === id" $ testFromToStream