Move Fold.Concurrent.Channel to Fold.Channel

This commit is contained in:
Harendra Kumar 2024-02-13 00:21:17 +05:30
parent ad435f8ffd
commit f5e8d91b0e
8 changed files with 141 additions and 137 deletions

View File

@ -36,7 +36,7 @@ import Data.HashMap.Strict (HashMap)
import Data.Hashable (Hashable)
import Streamly.Data.Fold
import Streamly.Internal.Data.Fold (toContainerIO)
import Streamly.Internal.Data.Fold.Concurrent
import Streamly.Internal.Data.Fold.Prelude
import Streamly.Internal.Data.IsMap.HashMap ()
-- | Split the input stream based on a hashable component of the key field and

View File

@ -0,0 +1,15 @@
-- |
-- Module : Streamly.Internal.Data.Fold.Channel
-- Copyright : (c) 2022 Composewell Technologies
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
module Streamly.Internal.Data.Fold.Channel
(
module Streamly.Internal.Data.Fold.Channel.Type
)
where
import Streamly.Internal.Data.Fold.Channel.Type

View File

@ -1,15 +1,24 @@
-- |
-- Module : Streamly.Internal.Data.Fold.Concurrent.Channel.Type
-- Module : Streamly.Internal.Data.Fold.Channel.Type
-- Copyright : (c) 2017, 2022 Composewell Technologies
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
module Streamly.Internal.Data.Fold.Concurrent.Channel.Type
( Channel (..)
, newChannel
module Streamly.Internal.Data.Fold.Channel.Type
(
-- ** Type
Channel (..)
-- ** Configuration
, Config
, maxBuffer
, boundThreads
, inspect
-- ** Operations
, newChannel
, sendToWorker
, checkFoldStatus
, dumpSVar

View File

@ -50,8 +50,108 @@
module Streamly.Internal.Data.Fold.Concurrent
(
module Streamly.Internal.Data.Fold.Concurrent.Channel
parEval
)
where
import Streamly.Internal.Data.Fold.Concurrent.Channel
import Control.Concurrent (takeMVar)
import Control.Monad (void, when)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.IORef (writeIORef)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Fold (Fold(..), Step (..))
import Streamly.Internal.Data.Channel.Worker (sendEvent)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Fold.Channel.Type
import Streamly.Internal.Data.Channel.Types
-------------------------------------------------------------------------------
-- Evaluating a Fold
-------------------------------------------------------------------------------
-- XXX Cleanup the fold if the stream is interrupted. Add a GC hook.
-- | Evaluate a fold asynchronously using a concurrent channel. The driver just
-- queues the input stream values to the fold channel buffer and returns. The
-- fold evaluates the queued values asynchronously. On finalization, 'parEval'
-- waits for the asynchronous fold to complete before it returns.
--
{-# INLINABLE parEval #-}
parEval :: MonadAsync m => (Config -> Config) -> Fold m a b -> Fold m a b
parEval modifier f =
Fold step initial extract final
where
-- XXX Supply an output channel to the fold. The fold would send the result
-- from each step (i.e. scan result) to the channel. The Partial and Done
-- constructors are sent to the channel. We then draw the resulting stream
-- from that channel. Kind of concurrrent mapping on the stream but with a
-- fold/scan.
--
-- There can also be a model where multiple folds pick input from the same
-- channel.
--
-- We can also run parsers this way. So instead of sending output on each
-- step it can send once it is done.
initial = Partial <$> newChannel modifier f
-- XXX This is not truly asynchronous. If the fold is done we only get to
-- know when we send the next input unless the stream ends. We could
-- potentially throw an async exception to the driver to inform it
-- asynchronously. Alternatively, the stream should not block forever, it
-- should keep polling the fold status. We can insert a timer tick in the
-- input stream to do that.
--
-- A polled stream abstraction may be useful, it would consist of normal
-- events and tick events, latter are guaranteed to arrive.
--
-- XXX We can use the config to indicate if the fold is a scanning type or
-- one-shot, or use a separate parEvalScan for scanning. For a scanning
-- type fold the worker would always send the intermediate values back to
-- the driver. An intermediate value can be returned on an input, or the
-- driver can poll even without input, if we have the Skip input support.
-- When the buffer is full we can return "Skip" and then the next step
-- without input can wait for an output to arrive. Similarly, when "final"
-- is called it can return "Skip" to continue or "Done" to indicate
-- termination.
step chan a = do
status <- sendToWorker chan a
return $ case status of
Nothing -> Partial chan
Just b -> Done b
-- XXX We can use a separate type for non-scanning folds that will
-- introduce a lot of complexity. Are there combinators that rely on the
-- "extract" function even in non-scanning use cases?
-- Instead of making such folds partial we can also make them return a
-- Maybe type.
extract _ = error "Concurrent folds do not support scanning"
-- XXX depending on the use case we may want to either wait for the result
-- or cancel the ongoing work. We can use the config to control that?
-- Currently it waits for the work to complete.
final chan = do
liftIO $ void
$ sendEvent
(outputQueue chan)
(outputDoorBell chan)
ChildStopChannel
status <- checkFoldStatus chan
case status of
Nothing -> do
liftIO
$ withDiagMVar
(svarInspectMode chan)
(dumpSVar chan)
"parEval: waiting to drain"
$ takeMVar (outputDoorBellFromConsumer chan)
-- XXX remove recursion
final chan
Just b -> do
when (svarInspectMode chan) $ liftIO $ do
t <- getTime Monotonic
writeIORef (svarStopTime (svarStats chan)) (Just t)
printSVar (dumpSVar chan) "SVar Done"
return b

View File

@ -1,123 +0,0 @@
-- |
-- Module : Streamly.Internal.Data.Fold.Concurrent.Channel
-- Copyright : (c) 2022 Composewell Technologies
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
module Streamly.Internal.Data.Fold.Concurrent.Channel
(
module Streamly.Internal.Data.Fold.Concurrent.Channel.Type
-- * Configuration
, maxBuffer
, boundThreads
, inspect
-- * Fold operations
, parEval
)
where
import Control.Concurrent (takeMVar)
import Control.Monad (void, when)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.IORef (writeIORef)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Fold (Fold(..), Step (..))
import Streamly.Internal.Data.Channel.Worker (sendEvent)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Fold.Concurrent.Channel.Type
import Streamly.Internal.Data.Channel.Types
-------------------------------------------------------------------------------
-- Evaluating a Fold
-------------------------------------------------------------------------------
-- XXX Cleanup the fold if the stream is interrupted. Add a GC hook.
-- | Evaluate a fold asynchronously using a concurrent channel. The driver just
-- queues the input stream values to the fold channel buffer and returns. The
-- fold evaluates the queued values asynchronously. On finalization, 'parEval'
-- waits for the asynchronous fold to complete before it returns.
--
{-# INLINABLE parEval #-}
parEval :: MonadAsync m => (Config -> Config) -> Fold m a b -> Fold m a b
parEval modifier f =
Fold step initial extract final
where
-- XXX Supply an output channel to the fold. The fold would send the result
-- from each step (i.e. scan result) to the channel. The Partial and Done
-- constructors are sent to the channel. We then draw the resulting stream
-- from that channel. Kind of concurrrent mapping on the stream but with a
-- fold/scan.
--
-- There can also be a model where multiple folds pick input from the same
-- channel.
--
-- We can also run parsers this way. So instead of sending output on each
-- step it can send once it is done.
initial = Partial <$> newChannel modifier f
-- XXX This is not truly asynchronous. If the fold is done we only get to
-- know when we send the next input unless the stream ends. We could
-- potentially throw an async exception to the driver to inform it
-- asynchronously. Alternatively, the stream should not block forever, it
-- should keep polling the fold status. We can insert a timer tick in the
-- input stream to do that.
--
-- A polled stream abstraction may be useful, it would consist of normal
-- events and tick events, latter are guaranteed to arrive.
--
-- XXX We can use the config to indicate if the fold is a scanning type or
-- one-shot, or use a separate parEvalScan for scanning. For a scanning
-- type fold the worker would always send the intermediate values back to
-- the driver. An intermediate value can be returned on an input, or the
-- driver can poll even without input, if we have the Skip input support.
-- When the buffer is full we can return "Skip" and then the next step
-- without input can wait for an output to arrive. Similarly, when "final"
-- is called it can return "Skip" to continue or "Done" to indicate
-- termination.
step chan a = do
status <- sendToWorker chan a
return $ case status of
Nothing -> Partial chan
Just b -> Done b
-- XXX We can use a separate type for non-scanning folds that will
-- introduce a lot of complexity. Are there combinators that rely on the
-- "extract" function even in non-scanning use cases?
-- Instead of making such folds partial we can also make them return a
-- Maybe type.
extract _ = error "Concurrent folds do not support scanning"
-- XXX depending on the use case we may want to either wait for the result
-- or cancel the ongoing work. We can use the config to control that?
-- Currently it waits for the work to complete.
final chan = do
liftIO $ void
$ sendEvent
(outputQueue chan)
(outputDoorBell chan)
ChildStopChannel
status <- checkFoldStatus chan
case status of
Nothing -> do
liftIO
$ withDiagMVar
(svarInspectMode chan)
(dumpSVar chan)
"parEval: waiting to drain"
$ takeMVar (outputDoorBellFromConsumer chan)
-- XXX remove recursion
final chan
Just b -> do
when (svarInspectMode chan) $ liftIO $ do
t <- getTime Monotonic
writeIORef (svarStopTime (svarStats chan)) (Just t)
printSVar (dumpSVar chan) "SVar Done"
return b

View File

@ -8,12 +8,18 @@
--
module Streamly.Internal.Data.Fold.Prelude
(
module Streamly.Internal.Data.Fold.Time
-- * Channel
module Streamly.Internal.Data.Fold.Channel
-- * Concurrency
, module Streamly.Internal.Data.Fold.Concurrent
-- * Time
, module Streamly.Internal.Data.Fold.Time
-- * Deprecated
, module Streamly.Internal.Data.Fold.SVar
)
where
import Streamly.Internal.Data.Fold.Time
import Streamly.Internal.Data.Fold.Channel
import Streamly.Internal.Data.Fold.Concurrent
import Streamly.Internal.Data.Fold.SVar
import Streamly.Internal.Data.Fold.Time

View File

@ -8,10 +8,7 @@
--
module Streamly.Internal.Data.Fold.Time
(
-- * Trimming
takeInterval
-- * Splitting
, intervalsOf
)
where

View File

@ -466,8 +466,8 @@ library
, Streamly.Internal.Data.Stream.Lifted
, Streamly.Internal.Data.Fold.Time
, Streamly.Internal.Data.Fold.Concurrent.Channel.Type
, Streamly.Internal.Data.Fold.Concurrent.Channel
, Streamly.Internal.Data.Fold.Channel.Type
, Streamly.Internal.Data.Fold.Channel
, Streamly.Internal.Data.Fold.Concurrent
, Streamly.Internal.Data.Unfold.Exception