Change mkAsync_ to non-monadic signature

This commit is contained in:
Harendra Kumar 2019-12-27 12:30:50 +05:30
parent 4486bf7216
commit 4c850ca092

View File

@ -854,31 +854,29 @@ distribute :: Monad m => [Fold m a b] -> Fold m a [b]
distribute [] = foldNil
distribute (x:xs) = foldCons x (distribute xs)
-- | Convert a fold into an asynchronous fold. The original fold runs in a
-- separate thread. The step function of the resulting fold just sends the
-- input to the thread. The output of the fold is discarded. If an exception
-- occurs in the original fold, the exception is thrown in the driving thread.
-- Note that exception checking is poll based, we check if the fold has
-- thrown any exceptions only at the point when we push an element to the fold.
-- | Convert a () returning fold into an asynchronous fold. The original fold
-- runs in a separate thread. The step function of the resulting fold just
-- sends the input to the thread. The output of the fold is discarded. If an
-- exception occurs in the original fold, the exception is thrown in the
-- driving thread. Note that exception checking is poll based, we check if the
-- fold has thrown any exceptions only at the point when we push an element to
-- the fold.
--
-- /Internal/
--
{-# INLINE mkAsync_ #-}
mkAsync_ :: MonadAsync m => Fold m a () -> m (Fold m a ())
mkAsync_ fld = do
sv <- newFoldSVar defState fld
let step False a = pushToFold sv a
step True _ = return True
extract _ = do
liftIO $ sendStop sv Nothing
-- drain/wait until a stop event arrives from the fold.
drainFold sv
return (Fold step (return False) extract)
mkAsync_ :: MonadAsync m => Fold m a () -> Fold m a ()
mkAsync_ fld = Fold step initial extract
where
initial = fmap Left (newFoldSVar defState fld)
step (Left sv) a = do
r <- pushToFold sv a
return $ if r then Right sv else Left sv
step acc _ = return acc
-- XXX deduplicate this with tapAsyncD
-- XXX drain it asynchronously?
drainFold svr = do
@ -891,6 +889,14 @@ mkAsync_ fld = do
$ takeMVar (outputDoorBellFromConsumer svr)
drainFold svr
extract acc = do
let sv = case acc of
Right svr -> svr
Left svr -> svr
liftIO $ sendStop sv Nothing
-- drain/wait until a stop event arrives from the fold.
drainFold sv
------------------------------------------------------------------------------
-- Partitioning
------------------------------------------------------------------------------