Add fold and parse for array streams

The test and benchmark code is taken from Adithya's original double
ended lists based implementation.
This commit is contained in:
Harendra Kumar 2021-04-27 02:37:14 +05:30
parent 613a85f988
commit 3339c2082d
7 changed files with 329 additions and 11 deletions

View File

@ -23,18 +23,26 @@ module Main
main
) where
import Control.DeepSeq (NFData(..))
import Control.Monad (void)
import Control.Monad.Catch (MonadThrow)
import Data.Functor.Identity (runIdentity)
import Data.Word (Word8)
import System.IO (Handle)
import System.Random (randomRIO)
import Prelude hiding ()
import qualified Streamly.Prelude as Stream
import qualified Streamly.Internal.Data.Array.Foreign as Array
import qualified Streamly.Internal.Data.Array.Stream.Foreign as ArrayStream
import qualified Streamly.Internal.Data.Fold as Fold
import qualified Streamly.Internal.Data.Parser.ParserD as ParserD
import qualified Streamly.Internal.Data.Stream.IsStream as Stream (arraysOf)
import qualified Streamly.Internal.FileSystem.Handle as Handle
import qualified Streamly.Internal.Unicode.Stream as Unicode
import Gauge hiding (env)
import Streamly.Prelude (SerialT, MonadAsync, IsStream)
import Streamly.Benchmark.Common
import Streamly.Benchmark.Common.Handle
@ -44,6 +52,28 @@ import Streamly.Internal.Data.Stream.StreamD.Type (Step(..))
import Test.Inspection
#endif
-------------------------------------------------------------------------------
-- Utilities
-------------------------------------------------------------------------------
-- XXX these can be moved to the common module
{-# INLINE sourceUnfoldrM #-}
sourceUnfoldrM :: (IsStream t, MonadAsync m) => Int -> Int -> t m Int
sourceUnfoldrM value n = Stream.unfoldrM step n
where
step cnt =
if cnt > n + value
then return Nothing
else return (Just (cnt, cnt + 1))
{-# INLINE benchIO #-}
benchIO
:: NFData b
=> String -> (Int -> t IO a) -> (t IO a -> IO b) -> Benchmark
benchIO name src sink =
bench name $ nfIO $ randomRIO (1,1) >>= sink . src
-------------------------------------------------------------------------------
-- read chunked using toChunks
-------------------------------------------------------------------------------
@ -183,6 +213,37 @@ o_1_space_copy_toChunks_group_ungroup env =
]
]
-------------------------------------------------------------------------------
-- Parsers
-------------------------------------------------------------------------------
{-# INLINE drainWhile #-}
drainWhile :: MonadThrow m => (a -> Bool) -> ParserD.Parser m a ()
drainWhile p = ParserD.takeWhile p Fold.drain
-------------------------------------------------------------------------------
-- Folds and parsers
-------------------------------------------------------------------------------
{-# INLINE fold #-}
fold :: SerialT IO (Array.Array Int) -> IO ()
fold s = void $ ArrayStream.fold Fold.drain s
{-# INLINE parseArray #-}
parseArray :: Int -> SerialT IO (Array.Array Int) -> IO ()
parseArray value s = void $ ArrayStream.parse (drainWhile (< value)) s
o_1_space_serial_array ::
Int -> [Array.Array Int] -> [Array.Array Int] -> [Benchmark]
o_1_space_serial_array bound arraysSmall arraysBig =
[ benchIO "fold (of 100)" (\_ -> Stream.fromList arraysSmall) fold
, benchIO "fold (single)" (\_ -> Stream.fromList arraysBig) fold
, benchIO "parse (of 100)" (\_ -> Stream.fromList arraysSmall)
$ parseArray bound
, benchIO "parse (single)" (\_ -> Stream.fromList arraysBig)
$ parseArray bound
]
-------------------------------------------------------------------------------
-- Driver
-------------------------------------------------------------------------------
@ -193,13 +254,20 @@ moduleName = "Data.Array.Stream.Foreign"
main :: IO ()
main = do
env <- mkHandleBenchEnv
defaultMain (allBenchmarks env)
runWithCLIOptsEnv defaultStreamSize alloc (allBenchmarks env)
where
allBenchmarks env =
[ bgroup (o_1_space_prefix moduleName)
( o_1_space_read_chunked env
++ o_1_space_copy_toChunks_group_ungroup env
)
alloc value = do
small <- Stream.toList $ Stream.arraysOf 100 $ sourceUnfoldrM value 0
big <- Stream.toList $ Stream.arraysOf value $ sourceUnfoldrM value 0
return (small, big)
allBenchmarks env arrays value =
let (arraysSmall, arraysBig) = arrays
in [ bgroup (o_1_space_prefix moduleName) $ Prelude.concat
[ o_1_space_read_chunked env
, o_1_space_serial_array value arraysSmall arraysBig
, o_1_space_copy_toChunks_group_ungroup env
]
]

View File

@ -51,7 +51,9 @@ cradle:
- path: "./test/Streamly/Test/Data/Array.hs"
component: "test:Data.Array"
- path: "./test/Streamly/Test/Data/Array/Foreign.hs"
component: "Data.Array.Foreign"
component: "test:Data.Array.Foreign"
- path: "./test/Streamly/Test/Data/Array/Stream/Foreign.hs"
component: "test:Data.Array.Stream.Foreign"
- path: "./test/Streamly/Test/Data/Fold.hs"
component: "test:Data.Fold"
- path: "./test/Streamly/Test/Data/List.hs"

View File

@ -23,7 +23,7 @@
-- >>> import qualified Streamly.Internal.Data.Stream.IsStream as Stream (arraysOf)
-- >>> import qualified Streamly.Prelude as Stream
--
-- >>> ArrayStream.fold (ArrayFold.fromFold (Fold.take 7 Fold.toList)) $ Stream.arraysOf 5 $ Stream.fromList "hello world"
-- >>> ArrayStream.foldArr (ArrayFold.fromFold (Fold.take 7 Fold.toList)) $ Stream.arraysOf 5 $ Stream.fromList "hello world"
-- "hello w"
--
module Streamly.Internal.Data.Array.Stream.Fold.Foreign

View File

@ -22,6 +22,11 @@ module Streamly.Internal.Data.Array.Stream.Foreign
, unlines
-- * Elimination
-- ** Element Folds
, fold
, parse
, parseD
-- ** Array Folds
, foldArr
, foldArr_
@ -50,6 +55,9 @@ import Data.Bifunctor (second)
import Control.Exception (assert)
import Control.Monad.Catch (MonadThrow, throwM)
import Control.Monad.IO.Class (MonadIO(..))
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
#endif
import Data.Word (Word8)
import Foreign.ForeignPtr (touchForeignPtr)
import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr)
@ -317,7 +325,65 @@ splitOnSuffix byte s =
D.fromStreamD $ D.splitInnerBySuffix (A.breakOn byte) A.spliceTwo $ D.toStreamD s
-------------------------------------------------------------------------------
-- Elimination
-- Elimination - Running folds
-------------------------------------------------------------------------------
-- XXX This should be written using CPS (as foldK) if we want it to scale wrt
-- to the number of times it can be called on the same stream.
--
{-# INLINE_NORMAL foldD #-}
foldD :: forall m a b. (MonadIO m, Storable a) =>
Fold m a b -> D.Stream m (Array a) -> m (b, D.Stream m (Array a))
foldD (Fold fstep initial extract) stream@(D.Stream step state) = do
res <- initial
case res of
FL.Partial fs -> go SPEC state fs
FL.Done fb -> return $! (fb, stream)
where
{-# INLINE go #-}
go !_ st !fs = do
r <- step defState st
case r of
D.Yield (Array (ForeignPtr start contents) (Ptr end)) s ->
goArray SPEC s (ForeignPtr end contents) (Ptr start) fs
D.Skip s -> go SPEC s fs
D.Stop -> do
b <- extract fs
return (b, D.nil)
goArray !_ s fp@(ForeignPtr end _) !cur !fs
| cur == Ptr end = do
liftIO $ touchForeignPtr fp
go SPEC s fs
goArray !_ st fp@(ForeignPtr end contents) !cur !fs = do
x <- liftIO $ peek cur
res <- fstep fs x
let elemSize = sizeOf (undefined :: a)
next = cur `plusPtr` elemSize
case res of
FL.Done b -> do
let !(Ptr curAddr) = next
arr = Array (ForeignPtr curAddr contents) (Ptr end)
return $! (b, D.cons arr (D.Stream step st))
FL.Partial fs1 -> goArray SPEC st fp next fs1
-- | Fold an array stream using the supplied 'Fold'. Returns the fold result
-- and the unconsumed stream.
--
-- /Internal/
--
{-# INLINE_NORMAL fold #-}
fold ::
(MonadIO m, Storable a)
=> FL.Fold m a b
-> SerialT m (A.Array a)
-> m (b, SerialT m (A.Array a))
fold f s = fmap D.fromStreamD <$> foldD f (D.toStreamD s)
-------------------------------------------------------------------------------
-- Fold to a single Array
-------------------------------------------------------------------------------
-- When we have to take an array partially, take the last part of the array.
@ -459,7 +525,7 @@ _toArraysOf n = FL.chunksOf n (A.writeNF n) FL.toStream
-}
-------------------------------------------------------------------------------
-- Parsing
-- Elimination - running element parsers
-------------------------------------------------------------------------------
-- GHC parser does not accept {-# ANN type [] NoSpecConstr #-}, so we need
@ -467,6 +533,110 @@ _toArraysOf n = FL.chunksOf n (A.writeNF n) FL.toStream
{-# ANN type List NoSpecConstr #-}
newtype List a = List {getList :: [a]}
-- This can be generalized to any type provided it can be unfolded to a stream
-- and it can be combined using a semigroup operation.
--
-- XXX This should be written using CPS (as parseK) if we want it to scale wrt
-- to the number of times it can be called on the same stream.
{-# INLINE_NORMAL parseD #-}
parseD ::
forall m a b. (MonadIO m, MonadThrow m, Storable a)
=> PRD.Parser m a b
-> D.Stream m (Array.Array a)
-> m (b, D.Stream m (Array.Array a))
parseD (PRD.Parser pstep initial extract) stream@(D.Stream step state) = do
res <- initial
case res of
PRD.IPartial s -> go SPEC state (List []) s
PRD.IDone b -> return (b, stream)
PRD.IError err -> throwM $ ParseError err
where
-- "backBuf" contains last few items in the stream that we may have to
-- backtrack to.
--
-- XXX currently we are using a dumb list based approach for backtracking
-- buffer. This can be replaced by a sliding/ring buffer using Data.Array.
-- That will allow us more efficient random back and forth movement.
{-# INLINE go #-}
go !_ st backBuf !pst = do
r <- step defState st
case r of
D.Yield (Array (ForeignPtr start contents) (Ptr end)) s ->
gobuf SPEC s backBuf (ForeignPtr end contents) (Ptr start) pst
D.Skip s -> go SPEC s backBuf pst
D.Stop -> do
b <- extract pst
return (b, D.nil)
-- Use strictness on "cur" to keep it unboxed
gobuf !_ s backBuf fp@(ForeignPtr end _) !cur !pst
| cur == Ptr end = do
liftIO $ touchForeignPtr fp
go SPEC s backBuf pst
gobuf !_ s backBuf fp@(ForeignPtr end contents) !cur !pst = do
x <- liftIO $ peek cur
pRes <- pstep pst x
let elemSize = sizeOf (undefined :: a)
next = cur `plusPtr` elemSize
case pRes of
PR.Partial 0 pst1 ->
gobuf SPEC s (List []) fp next pst1
PR.Partial n pst1 -> do
assert (n <= Prelude.length (x:getList backBuf)) (return ())
let src0 = Prelude.take n (x:getList backBuf)
arr0 = A.fromListN n (Prelude.reverse src0)
!(Ptr curAddr) = next
arr1 = Array (ForeignPtr curAddr contents) (Ptr end)
src = arr0 <> arr1
let !(Array (ForeignPtr start cont1) (Ptr end1)) = src
gobuf SPEC s (List []) (ForeignPtr end1 cont1) (Ptr start) pst1
PR.Continue 0 pst1 ->
gobuf SPEC s (List (x:getList backBuf)) fp next pst1
PR.Continue n pst1 -> do
assert (n <= Prelude.length (x:getList backBuf)) (return ())
let (src0, buf1) = splitAt n (x:getList backBuf)
arr0 = A.fromListN n (Prelude.reverse src0)
!(Ptr curAddr) = next
arr1 = Array (ForeignPtr curAddr contents) (Ptr end)
src = arr0 <> arr1
let !(Array (ForeignPtr start cont1) (Ptr end1)) = src
gobuf SPEC s (List buf1) (ForeignPtr end1 cont1) (Ptr start) pst1
PR.Done 0 b -> do
let !(Ptr curAddr) = next
arr = Array (ForeignPtr curAddr contents) (Ptr end)
return (b, D.cons arr (D.Stream step s))
PR.Done n b -> do
assert (n <= Prelude.length (x:getList backBuf)) (return ())
let src0 = Prelude.take n (x:getList backBuf)
-- XXX create the array in reverse instead
arr0 = A.fromListN n (Prelude.reverse src0)
!(Ptr curAddr) = next
arr1 = Array (ForeignPtr curAddr contents) (Ptr end)
-- XXX Use StreamK to avoid adding arbitrary layers of
-- constructors every time.
str = D.cons arr0 (D.cons arr1 (D.Stream step s))
return (b, str)
PR.Error err -> throwM $ ParseError err
-- | Parse an array stream using the supplied 'Parser'. Returns the parse
-- result and the unconsumed stream. Throws 'ParseError' if the parse fails.
--
-- /Internal/
--
{-# INLINE_NORMAL parse #-}
parse ::
(MonadIO m, MonadThrow m, Storable a)
=> PRD.Parser m a b
-> SerialT m (A.Array a)
-> m (b, SerialT m (A.Array a))
parse p s = fmap D.fromStreamD <$> parseD p (D.toStreamD s)
-------------------------------------------------------------------------------
-- Elimination - Running Array Folds and parsers
-------------------------------------------------------------------------------
{-# INLINE_NORMAL parseArrD #-}
parseArrD ::
forall m a b. (MonadIO m, MonadThrow m, Storable a)
@ -540,7 +710,7 @@ parseArrD (PRD.Parser pstep initial extract) stream@(D.Stream step state) = do
{-# INLINE parseArr #-}
parseArr ::
(MonadIO m, MonadThrow m, Storable a)
=> PRD.Parser m a b
=> ASF.Parser m a b
-> SerialT m (A.Array a)
-> m (b, SerialT m (A.Array a))
parseArr p s = fmap D.fromStreamD <$> parseD p (D.toStreamD s)

View File

@ -123,6 +123,7 @@ extra-source-files:
test/Streamly/Test/Data/Array/Prim.hs
test/Streamly/Test/Data/Array/Prim/Pinned.hs
test/Streamly/Test/Data/Array/Foreign.hs
test/Streamly/Test/Data/Array/Stream/Foreign.hs
test/Streamly/Test/Data/Parser/ParserD.hs
test/Streamly/Test/FileSystem/Event.hs
test/Streamly/Test/FileSystem/Handle.hs

View File

@ -0,0 +1,72 @@
module Main (main) where
import Streamly.Test.Common (listEquals, chooseInt)
import Test.Hspec (hspec, describe)
import Test.Hspec.QuickCheck
import Test.QuickCheck (forAll, Property, vectorOf, Gen)
import Test.QuickCheck.Monadic (monadicIO, run)
import qualified Streamly.Internal.Data.Array.Foreign as Array
import qualified Streamly.Internal.Data.Array.Stream.Foreign as ArrayStream
import qualified Streamly.Internal.Data.Fold as Fold
import qualified Streamly.Internal.Data.Parser.ParserD as ParserD
import qualified Streamly.Internal.Data.Stream.IsStream as Stream
import qualified Test.Hspec as Hspec
import Prelude hiding (sequence)
#if MIN_VERSION_QuickCheck(2,14,0)
import Test.QuickCheck (chooseAny)
#else
import System.Random (Random(random))
import Test.QuickCheck.Gen (Gen(MkGen))
-- | Generates a random element over the natural range of `a`.
chooseAny :: Random a => Gen a
chooseAny = MkGen (\r _ -> let (x,_) = random r in x)
#endif
maxTestCount :: Int
maxTestCount = 100
parse :: Property
parse = do
let len = 200
-- ls = input list (stream)
-- clen = chunk size
-- tlen = parser take size
forAll
((,,)
<$> vectorOf len (chooseAny :: Gen Int)
<*> chooseInt (1, len)
<*> chooseInt (0, len))
$ \(ls, clen, tlen) ->
monadicIO $ do
(ls1, str) <-
let input =
Stream.chunksOf
clen (Array.writeN clen) (Stream.fromList ls)
parser = ParserD.fromFold (Fold.take tlen Fold.toList)
in run $ ArrayStream.parse parser input
ls2 <- run $ Stream.toList $ ArrayStream.concat str
listEquals (==) (ls1 ++ ls2) ls
-------------------------------------------------------------------------------
-- Main
-------------------------------------------------------------------------------
moduleName :: String
moduleName = "Data.Array.Stream.Foreign"
main :: IO ()
main =
hspec $
Hspec.parallel $
modifyMaxSuccess (const maxTestCount) $ do
describe moduleName $ do
describe "Stream parsing" $ do
prop "parse" parse

View File

@ -222,6 +222,11 @@ test-suite Data.Array.Foreign
main-is: Streamly/Test/Data/Array/Foreign.hs
ghc-options: -main-is Streamly.Test.Data.Array.Foreign.main
test-suite Data.Array.Stream.Foreign
import: test-options
type: exitcode-stdio-1.0
main-is: Streamly/Test/Data/Array/Stream/Foreign.hs
test-suite Data.Fold
import: test-options
type: exitcode-stdio-1.0