Use all config options in Data.Stream.Concurrent tests (#1928)

Co-authored-by: Harendra Kumar <harendra@composewell.com>
This commit is contained in:
Ranjeet Ranjan 2023-07-25 15:29:21 +05:30 committed by GitHub
parent 84529565ca
commit eea1e0f634
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -23,8 +23,9 @@ import Test.QuickCheck (Testable, Property, choose, forAll, withMaxSuccess)
import Test.QuickCheck.Monadic (monadicIO, run)
import Test.Hspec as H
import qualified Streamly.Data.Fold as Fold
import qualified Streamly.Data.Fold as Fold ( toList )
import qualified Streamly.Data.Stream as Stream
( replicate, fromEffect, fromPure, fromList, fold, take, nil )
import qualified Streamly.Internal.Data.Stream.Concurrent as Async
import Streamly.Test.Common (listEquals)
@ -39,10 +40,10 @@ moduleName = "Data.Stream.Concurrent"
sortEq :: Ord a => [a] -> [a] -> Bool
sortEq a b = sort a == sort b
cmp :: Stream IO Int -> ([Int] -> [Int] -> Bool) -> [Int] -> Property
cmp s eq list =
cmp :: (Show a, Ord a) => ([a] -> [a] -> Bool) -> [a] -> Stream IO a -> Property
cmp eq list s =
monadicIO $ do
stream <- run $ Stream.fold Fold.toList s
stream <- run $ sort <$> Stream.fold Fold.toList s
listEquals eq stream list
prop1 :: Testable prop => String -> prop -> SpecWith ()
@ -226,94 +227,137 @@ main = hspec
$ describe moduleName $ do
let transform = transformCombineFromList Stream.fromList sortEq
prop "eval" $
prop "parEval" $
transform
(fmap (+2))
(fmap (+1) . Async.parEval id . fmap (+1))
asyncSpec $ prop "parSequence" . sequenceReplicate
-- XXX Need to use asyncSpec in all tests
prop "mapM (+1)" $
transform (fmap (+1)) (Async.parMapM id (\x -> return (x + 1)))
asyncSpec $
prop "parMapM (+1)"
. transform (fmap (+1))
. (`Async.parMapM` (\x -> return (x + 1)))
-- XXX Need to use eq instead of sortEq for ahead oeprations
-- Binary append
prop1 "append [] []"
$ cmp (Async.parList id [Stream.nil, Stream.nil]) sortEq []
prop1 "append [] [1]"
$ cmp (Async.parList id [Stream.nil, Stream.fromPure 1]) sortEq [1]
prop1 "append [1] []"
$ cmp (Async.parList id [Stream.fromPure 1, Stream.nil]) sortEq [1]
prop1 "append [0] [1]"
$ let stream = Async.parList id [Stream.fromPure 0, Stream.fromPure 1]
in cmp stream sortEq [0, 1]
asyncSpec $
let appWith cfg = Async.parList cfg [Stream.nil, Stream.nil]
in prop1 "parList [] []" . cmp sortEq ([] :: [Int]) . appWith
prop1 "append [0] [] [1]"
$ let stream =
Async.parList id
[Stream.fromPure 0, Stream.nil, Stream.fromPure 1]
in cmp stream sortEq [0, 1]
asyncSpec $
let appWith cfg = Async.parList cfg [Stream.nil, Stream.fromPure 1]
in prop1 "parList [] [1]" . cmp sortEq [1 :: Int] . appWith
let async = Async.parTwo id
prop1 "append2 left associated"
$ let stream =
Stream.fromPure 0
`async` Stream.fromPure 1
`async` Stream.fromPure 2
`async` Stream.fromPure 3
in cmp stream sortEq [0, 1, 2, 3]
asyncSpec $
let appWith cfg = Async.parList cfg [Stream.fromPure 1, Stream.nil]
in prop1 "parList [1] []" . cmp sortEq [1 :: Int] . appWith
prop1 "append right associated"
$ let stream =
Stream.fromPure 0
`async` (Stream.fromPure 1
`async` (Stream.fromPure 2
`async` Stream.fromPure 3))
in cmp stream sortEq [0, 1, 2, 3]
asyncSpec $
let appWith cfg =
Async.parList cfg [Stream.fromPure 0, Stream.fromPure 1]
in prop1 "parList [0] [1]" . cmp sortEq [0, 1 :: Int] . appWith
prop1 "append balanced"
$ let leaf x y = Stream.fromPure x `async` Stream.fromPure y
leaf11 = leaf 0 1 `async` leaf 2 (3 :: Int)
leaf12 = leaf 4 5 `async` leaf 6 7
stream = leaf11 `async` leaf12
in cmp stream sortEq [0, 1, 2, 3, 4, 5, 6,7]
asyncSpec $
let appWith cfg =
Async.parList
cfg [Stream.fromPure 0, Stream.nil, Stream.fromPure 1]
in prop1 "parList [0] [] [1]" . cmp sortEq [0, 1 :: Int] . appWith
prop1 "combineWith (maxThreads 1)"
$ let stream =
Async.parTwo (Async.maxThreads 1)
(Stream.fromList [1,2,3,4,5])
asyncSpec $
let appWith cfg =
Async.parTwo cfg
(Async.parTwo cfg
(Async.parTwo cfg
(Stream.fromPure 0) (Stream.fromPure 1))
(Stream.fromPure 2))
(Stream.fromPure 3)
in prop1 "parTwo left associated"
. cmp sortEq [0, 1, 2, 3 :: Int] . appWith
asyncSpec $
let appWith cfg =
Async.parTwo cfg
(Stream.fromPure 0)
(Async.parTwo cfg
(Stream.fromPure 1)
(Async.parTwo cfg
(Stream.fromPure 2) (Stream.fromPure 3))
)
in prop1 "parTwo right associated"
. cmp sortEq [0, 1, 2, 3 :: Int] . appWith
asyncSpec $
let leaf x y cfg =
Async.parTwo cfg (Stream.fromPure x)
(Stream.fromPure y)
leaf11 cfg =
Async.parTwo cfg (leaf 0 1 cfg) $ leaf 2 (3 :: Int) cfg
leaf12 cfg =
Async.parTwo cfg (leaf 4 5 cfg) $ leaf 6 7 cfg
appWith cfg =
Async.parTwo cfg (leaf11 cfg) (leaf12 cfg)
in prop1 "parTwo balanced"
. cmp sortEq [0, 1, 2, 3, 4, 5, 6,7] . appWith
asyncSpec $
let appWith cfg =
Async.parTwo cfg
(Stream.fromList [1,2,3,4,5 :: Int])
(Stream.fromList [6,7,8,9,10])
in cmp stream (==) [1,2,3,4,5,6,7,8,9,10]
in prop1 "parTwo" . cmp (==) [1,2,3,4,5,6,7,8,9,10] . appWith
prop1 "apply (async arg1)"
$ let s1 = Async.parApply id (Stream.fromPure (,)) (Stream.fromPure 1 `async` Stream.fromPure 2)
s2 = Async.parApply id s1 (Stream.fromPure 3) :: Stream IO (Int, Int)
xs = Stream.fold Fold.toList s2
in sort <$> xs `shouldReturn` [(1, 3), (2, 3)]
asyncSpec $
let par2 cfg =
Async.parTwo
cfg
(Stream.fromPure 1)
(Stream.fromPure 2)
s1 cfg =
Async.parApply
cfg
(Stream.fromPure (,))
(par2 cfg)
s2 cfg =
Async.parApply
cfg
(s1 cfg)
(Stream.fromPure 3) :: Stream IO (Int, Int)
in prop1
"parApply (async arg1)" . cmp (==) ( [(1, 3), (2, 3)]) . s2
prop1 "apply (async arg2)"
$ let s1 = Stream.fromPure (1,)
s2 = Async.parApply id s1 (Stream.fromPure 2 `async` Stream.fromPure 3)
xs = Stream.fold Fold.toList s2 :: IO [(Int, Int)]
in sort <$> xs `shouldReturn` [(1, 2), (1, 3)]
asyncSpec $
let par2 cfg =
Async.parTwo
cfg
(Stream.fromPure (2 :: Int))
(Stream.fromPure 3)
s1 = Stream.fromPure (1 :: Int,)
s2 cfg = Async.parApply cfg s1 (par2 cfg)
in prop1 "apply (async arg2)" . cmp (==) ([(1, 2), (1, 3)]) . s2
-- concat
prop1 "concat"
$ let stream =
Async.parConcat id
asyncSpec $
let stream cfg =
Async.parConcat cfg
$ fmap Stream.fromPure
$ Stream.fromList [1..100]
in cmp stream sortEq [1..100]
$ Stream.fromList [1..100 :: Int]
in prop1 "parConcat" . cmp sortEq [1..100] . stream
prop "concatMap" $
forAll (choose (0, 100)) $ \n ->
transform
(concatMap (const [1..n]))
(Async.parConcatMap id (const (Stream.fromList [1..n])))
asyncSpec $
let f cfg =
forAll (choose (0, 100)) $ \n ->
transform
(concatMap (const [1..n]))
(Async.parConcatMap
cfg (const (Stream.fromList [1..n]))
)
in prop "parConcatMap" . f
#ifdef DEVBUILD
describe "Time ordering" $ timeOrdering (Async.parList id)
#endif
let async = Async.parTwo id
describe "Exception propagation" $ exceptionPropagation async
-- Ad-hoc tests
it "takes n from stream of streams" $ takeCombined 2