mirror of
https://github.com/ilyakooo0/streamly.git
synced 2024-09-11 08:25:40 +03:00
148 lines
5.6 KiB
Haskell
148 lines
5.6 KiB
Haskell
{-# OPTIONS_GHC -Wno-deprecations #-}
|
|
|
|
-- |
|
|
-- Module : Streamly.Test.Prelude.Async
|
|
-- Copyright : (c) 2020 Composewell Technologies
|
|
--
|
|
-- License : BSD-3-Clause
|
|
-- Maintainer : streamly@composewell.com
|
|
-- Stability : experimental
|
|
-- Portability : GHC
|
|
|
|
module Streamly.Test.Prelude.Async (main) where
|
|
|
|
import Control.Concurrent (threadDelay)
|
|
import Data.List (sort)
|
|
import Test.Hspec.QuickCheck
|
|
import Test.QuickCheck (Property, withMaxSuccess)
|
|
import Test.QuickCheck.Monadic (monadicIO, run)
|
|
import Test.Hspec as H
|
|
|
|
import Streamly.Prelude
|
|
import qualified Streamly.Prelude as S
|
|
|
|
import Data.IORef
|
|
import Streamly.Test.Common
|
|
import Streamly.Test.Prelude.Common
|
|
|
|
|
|
moduleName :: String
|
|
moduleName = "Prelude.Async"
|
|
|
|
constructfromAsyncSingleThread ::
|
|
S.AsyncT IO Int -> S.AsyncT IO Int-> [Int] -> Property
|
|
constructfromAsyncSingleThread s1 s2 res =
|
|
withMaxSuccess maxTestCount $
|
|
monadicIO $ do
|
|
x <- run
|
|
$ S.toList
|
|
$ S.fromAsync
|
|
$ S.maxThreads 1
|
|
$ s1 `S.async` s2
|
|
equals (==) x res
|
|
|
|
concurrentApplicative :: IO ()
|
|
concurrentApplicative = do
|
|
ref <- newIORef []
|
|
let action i = modifyIORef ref (++ [i]) >> return (i :: Int)
|
|
s1 = S.fromEffect (threadDelay 2000000 >> action 1)
|
|
s2 = S.fromEffect (threadDelay 1000000 >> action 2)
|
|
res <- S.toList $ S.fromZipAsync $ (,) <$> s1 <*> s2
|
|
refVal <- readIORef ref
|
|
res `shouldBe` [(1, 2)]
|
|
refVal `shouldBe` [2, 1]
|
|
|
|
main :: IO ()
|
|
main = hspec
|
|
$ H.parallel
|
|
#ifdef COVERAGE_BUILD
|
|
$ modifyMaxSuccess (const 10)
|
|
#endif
|
|
$ describe moduleName $ do
|
|
let asyncOps :: IsStream t => ((AsyncT IO a -> t IO a) -> Spec) -> Spec
|
|
asyncOps spec = mapOps spec $ makeOps fromAsync
|
|
#ifndef COVERAGE_BUILD
|
|
<> [("maxBuffer (-1)", fromAsync . maxBuffer (-1))]
|
|
#endif
|
|
|
|
describe "Construction" $ do
|
|
asyncOps $ prop "asyncly replicateM" . constructWithReplicateM
|
|
asyncOps $ prop "asyncly cons" . constructWithCons S.cons
|
|
asyncOps $ prop "asyncly consM" . constructWithConsM S.consM sort
|
|
asyncOps $ prop "asyncly (.:)" . constructWithCons (S..:)
|
|
asyncOps $ prop "asyncly (|:)" . constructWithConsM (S.|:) sort
|
|
prop "asyncSingleThreaded" $
|
|
constructfromAsyncSingleThread
|
|
(S.fromList [1,2,3,4,5])
|
|
(S.fromList [6,7,8,9,10])
|
|
[1,2,3,4,5,6,7,8,9,10]
|
|
|
|
describe "Functor operations" $ do
|
|
asyncOps $ functorOps S.fromFoldable "asyncly" sortEq
|
|
asyncOps $ functorOps folded "asyncly folded" sortEq
|
|
|
|
describe "Monoid operations" $ do
|
|
asyncOps $ monoidOps "asyncly" mempty sortEq
|
|
|
|
describe "Async loops" $ loops fromAsync sort sort
|
|
|
|
describe "Bind and Monoidal composition combinations" $ do
|
|
asyncOps $ bindAndComposeSimpleOps "Async" sortEq
|
|
asyncOps $ bindAndComposeHierarchyOps "Async"
|
|
asyncOps $ nestTwoStreams "Async" sort sort
|
|
asyncOps $ nestTwoStreamsApp "Async" sort sort
|
|
|
|
describe "Semigroup operations" $ do
|
|
asyncOps $ semigroupOps "asyncly" sortEq
|
|
|
|
describe "Applicative operations" $ do
|
|
asyncOps $ applicativeOps S.fromFoldable "asyncly applicative" sortEq
|
|
asyncOps $ applicativeOps folded "asyncly applicative folded" sortEq
|
|
|
|
-- XXX add tests for indexed/indexedR
|
|
describe "Zip operations" $ do
|
|
-- We test only the serial zip with serial streams and the parallel
|
|
-- stream, because the rate setting in these streams can slow down
|
|
-- zipAsync.
|
|
asyncOps $ prop "zip applicative asyncly" . zipAsyncApplicative S.fromFoldable (==)
|
|
asyncOps $ prop "zip applicative asyncly folded" . zipAsyncApplicative folded (==)
|
|
asyncOps $ prop "zip monadic asyncly" . zipAsyncMonadic S.fromFoldable (==)
|
|
asyncOps $ prop "zip monadic asyncly folded" . zipAsyncMonadic folded (==)
|
|
it "zip monadic asyncly order" concurrentApplicative
|
|
|
|
-- XXX add merge tests like zip tests
|
|
-- for mergeBy, we can split a list randomly into two lists and
|
|
-- then merge them, it should result in original list
|
|
-- describe "Merge operations" $ do
|
|
|
|
describe "Monad operations" $ do
|
|
asyncOps $ prop "asyncly monad then" . monadThen S.fromFoldable sortEq
|
|
asyncOps $ prop "asyncly monad then folded" . monadThen folded sortEq
|
|
asyncOps $ prop "asyncly monad bind" . monadBind S.fromFoldable sortEq
|
|
asyncOps $ prop "asyncly monad bind folded" . monadBind folded sortEq
|
|
|
|
describe "Stream transform and combine operations" $ do
|
|
asyncOps $ transformCombineOpsCommon S.fromFoldable "asyncly" sortEq
|
|
asyncOps $ transformCombineOpsCommon folded "asyncly" sortEq
|
|
|
|
describe "Stream elimination operations" $ do
|
|
asyncOps $ eliminationOps S.fromFoldable "asyncly"
|
|
asyncOps $ eliminationOps folded "asyncly folded"
|
|
asyncOps $ eliminationOpsWord8 S.fromFoldable "asyncly"
|
|
asyncOps $ eliminationOpsWord8 folded "asyncly folded"
|
|
|
|
-- test both (<>) and mappend to make sure we are using correct instance
|
|
-- for Monoid that is using the right version of semigroup. Instance
|
|
-- deriving can cause us to pick wrong instances sometimes.
|
|
|
|
#ifdef DEVBUILD
|
|
describe "Async (<>) time order check" $ parallelCheck fromAsync (<>)
|
|
describe "Async mappend time order check" $ parallelCheck fromAsync mappend
|
|
#endif
|
|
|
|
describe "Tests for exceptions" $ asyncOps $ exceptionOps "asyncly"
|
|
describe "Composed MonadThrow asyncly" $ composeWithMonadThrow fromAsync
|
|
|
|
-- Ad-hoc tests
|
|
it "takes n from stream of streams" $ takeCombined 2 fromAsync
|