streamly/test/Streamly/Test/Prelude/Async.hs

148 lines
5.6 KiB
Haskell

{-# OPTIONS_GHC -Wno-deprecations #-}
-- |
-- Module : Streamly.Test.Prelude.Async
-- Copyright : (c) 2020 Composewell Technologies
--
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
module Streamly.Test.Prelude.Async (main) where
import Control.Concurrent (threadDelay)
import Data.List (sort)
import Test.Hspec.QuickCheck
import Test.QuickCheck (Property, withMaxSuccess)
import Test.QuickCheck.Monadic (monadicIO, run)
import Test.Hspec as H
import Streamly.Prelude
import qualified Streamly.Prelude as S
import Data.IORef
import Streamly.Test.Common
import Streamly.Test.Prelude.Common
moduleName :: String
moduleName = "Prelude.Async"
constructfromAsyncSingleThread ::
S.AsyncT IO Int -> S.AsyncT IO Int-> [Int] -> Property
constructfromAsyncSingleThread s1 s2 res =
withMaxSuccess maxTestCount $
monadicIO $ do
x <- run
$ S.toList
$ S.fromAsync
$ S.maxThreads 1
$ s1 `S.async` s2
equals (==) x res
concurrentApplicative :: IO ()
concurrentApplicative = do
ref <- newIORef []
let action i = modifyIORef ref (++ [i]) >> return (i :: Int)
s1 = S.fromEffect (threadDelay 2000000 >> action 1)
s2 = S.fromEffect (threadDelay 1000000 >> action 2)
res <- S.toList $ S.fromZipAsync $ (,) <$> s1 <*> s2
refVal <- readIORef ref
res `shouldBe` [(1, 2)]
refVal `shouldBe` [2, 1]
main :: IO ()
main = hspec
$ H.parallel
#ifdef COVERAGE_BUILD
$ modifyMaxSuccess (const 10)
#endif
$ describe moduleName $ do
let asyncOps :: IsStream t => ((AsyncT IO a -> t IO a) -> Spec) -> Spec
asyncOps spec = mapOps spec $ makeOps fromAsync
#ifndef COVERAGE_BUILD
<> [("maxBuffer (-1)", fromAsync . maxBuffer (-1))]
#endif
describe "Construction" $ do
asyncOps $ prop "asyncly replicateM" . constructWithReplicateM
asyncOps $ prop "asyncly cons" . constructWithCons S.cons
asyncOps $ prop "asyncly consM" . constructWithConsM S.consM sort
asyncOps $ prop "asyncly (.:)" . constructWithCons (S..:)
asyncOps $ prop "asyncly (|:)" . constructWithConsM (S.|:) sort
prop "asyncSingleThreaded" $
constructfromAsyncSingleThread
(S.fromList [1,2,3,4,5])
(S.fromList [6,7,8,9,10])
[1,2,3,4,5,6,7,8,9,10]
describe "Functor operations" $ do
asyncOps $ functorOps S.fromFoldable "asyncly" sortEq
asyncOps $ functorOps folded "asyncly folded" sortEq
describe "Monoid operations" $ do
asyncOps $ monoidOps "asyncly" mempty sortEq
describe "Async loops" $ loops fromAsync sort sort
describe "Bind and Monoidal composition combinations" $ do
asyncOps $ bindAndComposeSimpleOps "Async" sortEq
asyncOps $ bindAndComposeHierarchyOps "Async"
asyncOps $ nestTwoStreams "Async" sort sort
asyncOps $ nestTwoStreamsApp "Async" sort sort
describe "Semigroup operations" $ do
asyncOps $ semigroupOps "asyncly" sortEq
describe "Applicative operations" $ do
asyncOps $ applicativeOps S.fromFoldable "asyncly applicative" sortEq
asyncOps $ applicativeOps folded "asyncly applicative folded" sortEq
-- XXX add tests for indexed/indexedR
describe "Zip operations" $ do
-- We test only the serial zip with serial streams and the parallel
-- stream, because the rate setting in these streams can slow down
-- zipAsync.
asyncOps $ prop "zip applicative asyncly" . zipAsyncApplicative S.fromFoldable (==)
asyncOps $ prop "zip applicative asyncly folded" . zipAsyncApplicative folded (==)
asyncOps $ prop "zip monadic asyncly" . zipAsyncMonadic S.fromFoldable (==)
asyncOps $ prop "zip monadic asyncly folded" . zipAsyncMonadic folded (==)
it "zip monadic asyncly order" concurrentApplicative
-- XXX add merge tests like zip tests
-- for mergeBy, we can split a list randomly into two lists and
-- then merge them, it should result in original list
-- describe "Merge operations" $ do
describe "Monad operations" $ do
asyncOps $ prop "asyncly monad then" . monadThen S.fromFoldable sortEq
asyncOps $ prop "asyncly monad then folded" . monadThen folded sortEq
asyncOps $ prop "asyncly monad bind" . monadBind S.fromFoldable sortEq
asyncOps $ prop "asyncly monad bind folded" . monadBind folded sortEq
describe "Stream transform and combine operations" $ do
asyncOps $ transformCombineOpsCommon S.fromFoldable "asyncly" sortEq
asyncOps $ transformCombineOpsCommon folded "asyncly" sortEq
describe "Stream elimination operations" $ do
asyncOps $ eliminationOps S.fromFoldable "asyncly"
asyncOps $ eliminationOps folded "asyncly folded"
asyncOps $ eliminationOpsWord8 S.fromFoldable "asyncly"
asyncOps $ eliminationOpsWord8 folded "asyncly folded"
-- test both (<>) and mappend to make sure we are using correct instance
-- for Monoid that is using the right version of semigroup. Instance
-- deriving can cause us to pick wrong instances sometimes.
#ifdef DEVBUILD
describe "Async (<>) time order check" $ parallelCheck fromAsync (<>)
describe "Async mappend time order check" $ parallelCheck fromAsync mappend
#endif
describe "Tests for exceptions" $ asyncOps $ exceptionOps "asyncly"
describe "Composed MonadThrow asyncly" $ composeWithMonadThrow fromAsync
-- Ad-hoc tests
it "takes n from stream of streams" $ takeCombined 2 fromAsync