Remove the intermediate Concurrent/Time/Exception modules

This commit is contained in:
Harendra Kumar 2023-11-27 04:55:27 +05:30
parent 58439bc1b0
commit fd93674cec
16 changed files with 232 additions and 286 deletions

View File

@ -14,12 +14,12 @@ where
import Stream.Common
(composeN, benchIO, benchIOSink, benchIOSrc, sourceUnfoldrM)
import Streamly.Data.Stream (Stream)
import Streamly.Internal.Data.Stream.Concurrent (MonadAsync, Config)
import Streamly.Internal.Data.Stream.Prelude (MonadAsync, Config)
import qualified Data.List as List
import qualified Streamly.Data.Fold as Fold
import qualified Streamly.Data.Stream as Stream
import qualified Streamly.Internal.Data.Stream.Concurrent as Async
import qualified Streamly.Internal.Data.Stream.Prelude as Async
import Test.Tasty.Bench
import Prelude hiding (mapM)

View File

@ -9,7 +9,7 @@
import Stream.ConcurrentCommon (allBenchmarks)
import Streamly.Benchmark.Common (runWithCLIOpts, defaultStreamSize)
import qualified Streamly.Internal.Data.Stream.Concurrent as Async
import qualified Streamly.Internal.Data.Stream.Prelude as Async
moduleName :: String
moduleName = "Data.Stream.ConcurrentEager"

View File

@ -9,7 +9,7 @@
import Stream.ConcurrentCommon (allBenchmarks)
import Streamly.Benchmark.Common (runWithCLIOpts, defaultStreamSize)
import qualified Streamly.Internal.Data.Stream.Concurrent as Async
import qualified Streamly.Internal.Data.Stream.Prelude as Async
moduleName :: String
moduleName = "Data.Stream.ConcurrentInterleaved"

View File

@ -9,7 +9,7 @@
import Stream.ConcurrentCommon (allBenchmarks)
import Streamly.Benchmark.Common (runWithCLIOpts, defaultStreamSize)
import qualified Streamly.Internal.Data.Stream.Concurrent as Async
import qualified Streamly.Internal.Data.Stream.Prelude as Async
moduleName :: String
moduleName = "Data.Stream.ConcurrentOrdered"

View File

@ -42,7 +42,7 @@ import qualified Streamly.Internal.Data.Unfold.Prelude as IUF
import qualified Streamly.Internal.Data.Stream.IsStream as Stream
#else
import qualified Streamly.Internal.Data.Stream as Stream
import qualified Streamly.Internal.Data.Stream.Exception.Lifted as Stream
import qualified Streamly.Internal.Data.Stream.Prelude as Stream
#endif
import Test.Tasty.Bench hiding (env)

View File

@ -48,7 +48,7 @@ import Streamly.Internal.Data.Stream (Stream)
import qualified Streamly.Internal.Data.Stream as S
#ifndef USE_STREAMLY_CORE
import qualified Streamly.Data.Stream.Prelude as S
import qualified Streamly.Internal.Data.Stream.Time as S
import qualified Streamly.Internal.Data.Stream.Prelude as S
#endif
#ifdef USE_STREAMK

View File

@ -46,7 +46,7 @@ import Streamly.Internal.Data.Time.Units
import Streamly.Internal.Data.Stream (Stream)
import qualified Streamly.Internal.Data.Stream as Stream
#ifndef USE_STREAMLY_CORE
import qualified Streamly.Internal.Data.Stream.Time as Stream
import qualified Streamly.Internal.Data.Stream.Prelude as Stream
#endif
#ifdef USE_STREAMK
import Control.DeepSeq (NFData(..))

View File

@ -1,195 +0,0 @@
-- |
-- Module : Streamly.Data.Stream.Concurrent
-- Copyright : (c) 2022 Composewell Technologies
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : released
-- Portability : GHC
--
-- This module provides concurrent streaming abstractions.
--
-- == Programming Tips
--
-- The names in this module do not conflict with other stream modules,
-- therefore, you can import it in the same namespace:
--
-- >>> import qualified Streamly.Data.Stream.Concurrent as Stream
module Streamly.Data.Stream.Concurrent
(
-- $concurrency
-- ** Types
MonadAsync
-- ** Configuration
, Config
-- *** Limits
, maxThreads
, maxBuffer
-- *** Rate Control
, Rate(..)
, rate
, avgRate
, minRate
, maxRate
, constRate
-- *** Stop behavior
, StopWhen (..)
, stopWhen
-- *** Scheduling behavior
, eager
, ordered
, interleaved
-- *** Diagnostics
, inspect
-- ** Combinators
-- | Stream combinators using a concurrent channel.
-- *** Evaluate
-- | Evaluates a serial stream asynchronously using a concurency channel.
, parEval
-- *** Generate
-- | Uses a concurrency channel to evaluate multiple actions concurrently.
, parRepeatM
, parReplicateM
, fromCallback
-- *** Map
-- | Uses a concurrency channel to evaluate multiple mapped actions
-- concurrently.
, parMapM
, parSequence
-- *** Combine two
-- | Use a channel for each pair.
, parZipWithM
, parZipWith
, parMergeByM
, parMergeBy
-- *** List of streams
-- | Shares a single channel across many streams.
, parList
-- , zipWithM
-- , zipWith
-- *** Stream of streams
-- **** Apply
, parApply
-- **** Concat
-- | Shares a single channel across many streams.
, parConcat
, parConcatMap
-- **** ConcatIterate
, parConcatIterate
-- *** Observation
, parTapCount
-- ** Deprecated
, tapCount
)
where
import Streamly.Internal.Data.Stream.Concurrent
import Prelude hiding (mapM, sequence, concat, concatMap, zipWith)
-- $concurrency
--
-- == Channels
--
-- At a lower level, concurrency is implemented using channels that support
-- concurrent evaluation of streams. We create a channel, and add one or more
-- streams to it. The channel evaluates multiple streams concurrently and then
-- generates a single output stream from the results. How the streams are
-- combined depends on the configuration of the channel.
--
-- == Primitives
--
-- There are only a few fundamental abstractions for concurrency, 'parEval',
-- 'parConcatMap', and 'parConcatIterate', all concurrency combinators can be
-- expressed in terms of these.
--
-- 'parEval', evaluates a single stream asynchronously, a worker thread runs
-- the stream and buffers the results, and the consumer of the stream runs in
-- another thread consuming it from the buffer, thus decoupling the production
-- and consumption of the stream. This can be used to run different stages of a
-- pipeline concurrently.
--
-- 'parConcatMap' is used to evaluate multiple streams concurrently and combine
-- the results. A stream generator function is mapped to the input stream and
-- all the generated streams are then evaluated concurrently, and the results
-- are combined.
--
-- 'parConcatIterate' is like 'parConcatMap' but iterates a stream generator
-- function recursively over the stream. This can be used to traverse trees or
-- graphs.
--
-- == Configuration
--
-- Concurrent combinators take a 'Config' argument which controls the
-- concurrent behavior. For example, maximum number of threads to be used
-- ('maxThreads') or the maxmimum size of the buffer ('maxBuffer'), or how the
-- streams are scheduled with respect to each other ('interleaved'), or how the
-- results are consumed ('ordered').
--
-- Configuration is specified as @Config -> Config@ modifier functions that can
-- be composed together using function composition. For example, to specify the
-- maximum threads we can use @parConcatMap (maxThreads 10)@ if we also want to
-- specify the maximum buffer we can compose the two options @parConcatMap
-- (maxThreads 10 . maxBuffer 100)@. To use default configuration use 'id' as
-- the config modifier e.g. @parConcatMap id@.
--
-- See the @Configuration@ section and individual configuration options'
-- documentation for the default behavior and default values of configuration
-- parameters.
--
-- == Scheduling
--
-- The most important configuration option is to control whether the output of
-- the concurrent execution is consumed in the same order as the corresponding
-- actions in the input stream or as soon as they arrive. The default is the
-- latter, however, we can enforce the original order by using the 'ordered'
-- option.
--
-- Another important option controls whether the number of worker threads are
-- automatically increased and decreased based on the consumption rate or
-- threads are started as aggresively as possible until the 'maxThreads' or
-- 'maxBuffer' limits are hit. The default is the former. However, the 'eager'
-- option can be enabled to use the latter behavior. When 'eager' is on, even
-- if the stream consumer thread blocks it does not make any impact on the
-- scheduling of the available tasks.
--
-- == Combinators
--
-- Using the few fundamental concurrency primitives we can implement all the
-- usual streaming combinators with concurrent behavior. Combinators like
-- 'unfoldrM', 'iterateM' that are inherently serial can be evaluated
-- concurrently with respect to the consumer pipeline using 'parEval'.
-- Combinators like 'zipWithM', 'mergeByM' can also use 'parEval' on the input
-- streams to evaluate them concurrently before combining.
--
-- Combinators like 'repeatM', 'replicateM', 'fromListM', 'sequence', 'mapM' in
-- which all actions are independent of each other can be made concurrent using
-- the 'parConcatMap' operation.
--
-- A concurrent 'repeatM' repeats an action using multiple concurrent
-- executions of the action. Similarly, a concurrent 'mapM' performs the mapped
-- action in independent threads.
--
-- Some common concurrent combinators are provided in this module.

View File

@ -1,23 +0,0 @@
-- |
-- Module : Streamly.Data.Stream.Exception
-- Copyright : (c) 2022 Composewell Technologies
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : released
-- Portability : GHC
--
-- This module is designed such that it does not conflict with
-- "Streamly.Data.Stream" module.
--
-- >>> import qualified Streamly.Data.Stream.Exception as Stream
--
module Streamly.Data.Stream.Exception
(
after
, bracket
-- , bracket3
, finally
)
where
import Streamly.Internal.Data.Stream.Lifted (bracket, finally, after)

View File

@ -8,7 +8,8 @@
--
-- All Stream related combinators including the streamly-core
-- "Streamly.Data.Stream" module, concurrency, time and lifted
-- exception operations.
-- exception operations. For more pre-release operations also see
-- "Streamly.Internal.Data.Stream.Prelude" module.
--
module Streamly.Data.Stream.Prelude
(
@ -17,22 +18,205 @@ module Streamly.Data.Stream.Prelude
-- module. For more pre-release combinators also see
-- "Streamly.Internal.Data.Stream" module.
module Streamly.Data.Stream
-- * Concurrent Operations
-- | Also see "Streamly.Internal.Data.Stream.Concurrent" for more
-- pre-release functions.
, module Streamly.Data.Stream.Concurrent
-- $concurrency
-- ** Types
, MonadAsync
-- ** Configuration
, Config
-- *** Limits
, maxThreads
, maxBuffer
-- *** Rate Control
, Rate(..)
, rate
, avgRate
, minRate
, maxRate
, constRate
-- *** Stop behavior
, StopWhen (..)
, stopWhen
-- *** Scheduling behavior
, eager
, ordered
, interleaved
-- *** Diagnostics
, inspect
-- ** Combinators
-- | Stream combinators using a concurrent channel.
-- *** Evaluate
-- | Evaluates a serial stream asynchronously using a concurency channel.
, parEval
-- *** Generate
-- | Uses a concurrency channel to evaluate multiple actions concurrently.
, parRepeatM
, parReplicateM
, fromCallback
-- *** Map
-- | Uses a concurrency channel to evaluate multiple mapped actions
-- concurrently.
, parMapM
, parSequence
-- *** Combine two
-- | Use a channel for each pair.
, parZipWithM
, parZipWith
, parMergeByM
, parMergeBy
-- *** List of streams
-- | Shares a single channel across many streams.
, parList
-- , zipWithM
-- , zipWith
-- *** Stream of streams
-- **** Apply
, parApply
-- **** Concat
-- | Shares a single channel across many streams.
, parConcat
, parConcatMap
-- **** ConcatIterate
, parConcatIterate
-- *** Observation
, parTapCount
-- * Time Related
-- | Also see "Streamly.Internal.Data.Stream.Time" for more pre-release
-- functions.
, module Streamly.Data.Stream.Time
-- ** Timers
, interject
-- ** Trimming
, takeInterval
, dropInterval
-- ** Chunking
, intervalsOf
-- ** Sampling
, sampleIntervalEnd
, sampleIntervalStart
, sampleBurstEnd
, sampleBurstStart
-- * Lifted Exceptions
-- | Also see "Streamly.Internal.Data.Stream.Exception.Lifted" for more
-- pre-release functions.
, module Streamly.Data.Stream.Exception
, after
, bracket
-- , bracket3
, finally
-- ** Deprecated
, tapCount
)
where
import Streamly.Data.Stream
import Streamly.Data.Stream.Concurrent
import Streamly.Data.Stream.Exception
import Streamly.Data.Stream.Time
import Streamly.Internal.Data.Stream.Prelude
-- $concurrency
--
-- == Channels
--
-- At a lower level, concurrency is implemented using channels that support
-- concurrent evaluation of streams. We create a channel, and add one or more
-- streams to it. The channel evaluates multiple streams concurrently and then
-- generates a single output stream from the results. How the streams are
-- combined depends on the configuration of the channel.
--
-- == Primitives
--
-- There are only a few fundamental abstractions for concurrency, 'parEval',
-- 'parConcatMap', and 'parConcatIterate', all concurrency combinators can be
-- expressed in terms of these.
--
-- 'parEval', evaluates a single stream asynchronously, a worker thread runs
-- the stream and buffers the results, and the consumer of the stream runs in
-- another thread consuming it from the buffer, thus decoupling the production
-- and consumption of the stream. This can be used to run different stages of a
-- pipeline concurrently.
--
-- 'parConcatMap' is used to evaluate multiple streams concurrently and combine
-- the results. A stream generator function is mapped to the input stream and
-- all the generated streams are then evaluated concurrently, and the results
-- are combined.
--
-- 'parConcatIterate' is like 'parConcatMap' but iterates a stream generator
-- function recursively over the stream. This can be used to traverse trees or
-- graphs.
--
-- == Configuration
--
-- Concurrent combinators take a 'Config' argument which controls the
-- concurrent behavior. For example, maximum number of threads to be used
-- ('maxThreads') or the maxmimum size of the buffer ('maxBuffer'), or how the
-- streams are scheduled with respect to each other ('interleaved'), or how the
-- results are consumed ('ordered').
--
-- Configuration is specified as @Config -> Config@ modifier functions that can
-- be composed together using function composition. For example, to specify the
-- maximum threads we can use @parConcatMap (maxThreads 10)@ if we also want to
-- specify the maximum buffer we can compose the two options @parConcatMap
-- (maxThreads 10 . maxBuffer 100)@. To use default configuration use 'id' as
-- the config modifier e.g. @parConcatMap id@.
--
-- See the @Configuration@ section and individual configuration options'
-- documentation for the default behavior and default values of configuration
-- parameters.
--
-- == Scheduling
--
-- The most important configuration option is to control whether the output of
-- the concurrent execution is consumed in the same order as the corresponding
-- actions in the input stream or as soon as they arrive. The default is the
-- latter, however, we can enforce the original order by using the 'ordered'
-- option.
--
-- Another important option controls whether the number of worker threads are
-- automatically increased and decreased based on the consumption rate or
-- threads are started as aggresively as possible until the 'maxThreads' or
-- 'maxBuffer' limits are hit. The default is the former. However, the 'eager'
-- option can be enabled to use the latter behavior. When 'eager' is on, even
-- if the stream consumer thread blocks it does not make any impact on the
-- scheduling of the available tasks.
--
-- == Combinators
--
-- Using the few fundamental concurrency primitives we can implement all the
-- usual streaming combinators with concurrent behavior. Combinators like
-- 'unfoldrM', 'iterateM' that are inherently serial can be evaluated
-- concurrently with respect to the consumer pipeline using 'parEval'.
-- Combinators like 'zipWithM', 'mergeByM' can also use 'parEval' on the input
-- streams to evaluate them concurrently before combining.
--
-- Combinators like 'repeatM', 'replicateM', 'fromListM', 'sequence', 'mapM' in
-- which all actions are independent of each other can be made concurrent using
-- the 'parConcatMap' operation.
--
-- A concurrent 'repeatM' repeats an action using multiple concurrent
-- executions of the action. Similarly, a concurrent 'mapM' performs the mapped
-- action in independent threads.
--
-- Some common concurrent combinators are provided in this module.

View File

@ -1,36 +0,0 @@
-- |
-- Module : Streamly.Data.Stream.Time
-- Copyright : (c) 2022 Composewell Technologies
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : released
-- Portability : GHC
--
-- This module provides time related streaming abstractions.
--
-- The names in this module do not conflict with other stream modules,
-- therefore, you can import it in the same namespace:
--
-- >>> import qualified Streamly.Data.Stream.Time as Stream
--
module Streamly.Data.Stream.Time
(
-- ** Timers
interject
-- ** Trimming
, takeInterval
, dropInterval
-- ** Chunking
, intervalsOf
-- ** Sampling
, sampleIntervalEnd
, sampleIntervalStart
, sampleBurstEnd
, sampleBurstStart
)
where
import Streamly.Internal.Data.Stream.Time

View File

@ -0,0 +1,19 @@
-- |
-- Module : Streamly.Internal.Data.Stream.Prelude
-- Copyright : (c) 2022 Composewell Technologies
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
--
module Streamly.Internal.Data.Stream.Prelude
(
module Streamly.Internal.Data.Stream.Concurrent
, module Streamly.Internal.Data.Stream.Time
, module Streamly.Internal.Data.Stream.Lifted
)
where
import Streamly.Internal.Data.Stream.Concurrent
import Streamly.Internal.Data.Stream.Time
import Streamly.Internal.Data.Stream.Lifted

View File

@ -119,13 +119,13 @@ import System.Win32.Types (BOOL, DWORD, HANDLE, LPVOID, LPDWORD, failIfFalse_)
import Streamly.Data.Array (Array)
import Streamly.Data.Stream (Stream)
import Streamly.Data.Stream.Concurrent (eager)
import Streamly.Data.Stream.Prelude (eager)
import qualified Data.List.NonEmpty as NonEmpty
import qualified Streamly.Data.Array as A (fromList)
import qualified Streamly.Data.Fold as Fold
import qualified Streamly.Data.Stream as S
import qualified Streamly.Data.Stream.Concurrent as S
import qualified Streamly.Data.Stream.Prelude as S
import qualified Streamly.Unicode.Stream as U
import qualified Streamly.Internal.Unicode.Utf8 as UTF8 (pack, toArray)
import qualified Streamly.Internal.Data.Array as A (read)

View File

@ -375,10 +375,8 @@ library
, Streamly.Internal.Data.Unfold.Prelude
, Streamly.Internal.Data.Fold.Prelude
, Streamly.Internal.Data.Stream.Concurrent
, Streamly.Internal.Data.Stream.Prelude
, Streamly.Internal.Data.Stream.Zip.Concurrent
, Streamly.Internal.Data.Stream.Time
, Streamly.Internal.Data.Stream.Lifted
-- streamly-unicode (depends on unicode-data)
, Streamly.Internal.Unicode.Utf8
@ -446,10 +444,6 @@ library
, Streamly.Internal.Data.Channel.Dispatcher
, Streamly.Internal.Data.Channel.Worker
, Streamly.Data.Stream.Exception
, Streamly.Data.Stream.Concurrent
, Streamly.Data.Stream.Time
, Streamly.Internal.Data.Stream.Concurrent.Channel.Type
, Streamly.Internal.Data.Stream.Concurrent.Channel.Dispatcher
, Streamly.Internal.Data.Stream.Concurrent.Channel.Consumer
@ -457,6 +451,9 @@ library
, Streamly.Internal.Data.Stream.Concurrent.Channel.Interleave
, Streamly.Internal.Data.Stream.Concurrent.Channel.Operations
, Streamly.Internal.Data.Stream.Concurrent.Channel
, Streamly.Internal.Data.Stream.Concurrent
, Streamly.Internal.Data.Stream.Time
, Streamly.Internal.Data.Stream.Lifted
, Streamly.Internal.Data.Fold.Async
, Streamly.Internal.Data.Fold.Concurrent.Channel.Type

View File

@ -26,7 +26,7 @@ import Test.Hspec as H
import qualified Streamly.Data.Fold as Fold ( toList )
import qualified Streamly.Data.Stream as Stream
( replicate, fromEffect, fromPure, fromList, fold, take, nil )
import qualified Streamly.Internal.Data.Stream.Concurrent as Async
import qualified Streamly.Internal.Data.Stream.Prelude as Async
import Streamly.Test.Common (listEquals)

View File

@ -67,7 +67,7 @@ import System.IO.Temp (withSystemTempDirectory)
import qualified Data.List.NonEmpty as NonEmpty
import qualified Streamly.Internal.Data.Array as Array
import qualified Streamly.Internal.Data.Stream as Stream
import qualified Streamly.Internal.Data.Stream.Concurrent as Stream
import qualified Streamly.Internal.Data.Stream.Prelude as Stream
import qualified Streamly.Unicode.Stream as Unicode
#if defined(FILESYSTEM_EVENT_LINUX)