Refactor gbracket, Add ghandle to retry the stream on exception.

This commit is contained in:
Pranay Sashank 2020-08-22 13:59:23 +05:30 committed by pranaysashank
parent c8c161c34b
commit 971da77528

View File

@ -292,6 +292,7 @@ module Streamly.Internal.Data.Stream.StreamD
, onException
, finally_
, finally
, ghandle
, handle
-- * Concurrent Application
@ -3138,7 +3139,7 @@ gbracket_
=> m c -- ^ before
-> (forall s. m s -> m (Either e s)) -- ^ try (exception handling)
-> (c -> m d) -- ^ after, on normal stop
-> (c -> e -> Stream m b) -- ^ on exception
-> (c -> e -> Stream m b -> Stream m b) -- ^ on exception
-> (c -> Stream m b) -- ^ stream generator
-> Stream m b
gbracket_ bef exc aft fexc fnormal =
@ -3159,7 +3160,8 @@ gbracket_ bef exc aft fexc fnormal =
return $ Yield x (GBracketNormal (Stream step1 s) v)
Skip s -> return $ Skip (GBracketNormal (Stream step1 s) v)
Stop -> aft v >> return Stop
Left e -> return $ Skip (GBracketException (fexc v e))
Left e ->
return $ Skip (GBracketException (fexc v e (UnStream step1 st)))
step gst (GBracketException (UnStream step1 st)) = do
res <- step1 gst st
case res of
@ -3220,7 +3222,7 @@ gbracket
=> m c -- ^ before
-> (forall s. m s -> m (Either e s)) -- ^ try (exception handling)
-> (c -> m d) -- ^ after, on normal stop or GC
-> (c -> e -> Stream m b) -- ^ on exception
-> (c -> e -> Stream m b -> Stream m b) -- ^ on exception
-> (c -> Stream m b) -- ^ stream generator
-> Stream m b
gbracket bef exc aft fexc fnormal =
@ -3256,7 +3258,7 @@ gbracket bef exc aft fexc fnormal =
return Stop
Left e -> do
clearIORefFinalizer ref
return $ Skip (GBracketIOException (fexc v e))
return $ Skip (GBracketIOException (fexc v e (UnStream step1 st)))
step gst (GBracketIOException (UnStream step1 st)) = do
res <- step1 gst st
case res of
@ -3332,7 +3334,7 @@ after action (Stream step state) = Stream step' Nothing
onException :: MonadCatch m => m b -> Stream m a -> Stream m a
onException action str =
gbracket_ (return ()) MC.try return
(\_ (e :: MC.SomeException) -> nilM (action >> MC.throwM e))
(\_ (e :: MC.SomeException) _ -> nilM (action >> MC.throwM e))
(\_ -> str)
{-# INLINE_NORMAL _onException #-}
@ -3361,7 +3363,7 @@ _onException action (Stream step state) = Stream step' state
bracket_ :: MonadCatch m => m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
bracket_ bef aft bet =
gbracket_ bef MC.try aft
(\a (e :: SomeException) -> nilM (aft a >> MC.throwM e)) bet
(\a (e :: SomeException) _ -> nilM (aft a >> MC.throwM e)) bet
-- | Run the first action before the stream starts and remember its output,
-- generate a stream using the output, run the second action providing the
@ -3373,7 +3375,7 @@ bracket :: (MonadAsync m, MonadCatch m)
=> m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
bracket bef aft bet =
gbracket bef MC.try aft
(\a (e :: SomeException) -> nilM (aft a >> MC.throwM e)) bet
(\a (e :: SomeException) _ -> nilM (aft a >> MC.throwM e)) bet
data BracketState s v = BracketInit | BracketRun s v
@ -3412,13 +3414,23 @@ finally_ action xs = bracket_ (return ()) (\_ -> action) (const xs)
finally :: (MonadAsync m, MonadCatch m) => m b -> Stream m a -> Stream m a
finally action xs = bracket (return ()) (\_ -> action) (const xs)
-- | When evaluating a stream if an exception occurs, stream
-- evaluation aborts and the specified exception handler is run with
-- the exception and the Stream which threw the exception as argument.
--
{-# INLINE_NORMAL ghandle #-}
ghandle :: (MonadCatch m, Exception e)
=> (e -> Stream m a -> Stream m a) -> Stream m a -> Stream m a
ghandle f str =
gbracket_ (return ()) MC.try return (\_ -> f) (\_ -> str)
-- | When evaluating a stream if an exception occurs, stream evaluation aborts
-- and the specified exception handler is run with the exception as argument.
{-# INLINE_NORMAL handle #-}
handle :: (MonadCatch m, Exception e)
=> (e -> Stream m a) -> Stream m a -> Stream m a
handle f str =
gbracket_ (return ()) MC.try return (\_ e -> f e) (\_ -> str)
gbracket_ (return ()) MC.try return (\_ e _ -> f e) (\_ -> str)
{-# INLINE_NORMAL _handle #-}
_handle :: (MonadCatch m, Exception e)