Remove unnecessary type parameter from AheadHeapEntry

This commit is contained in:
Harendra Kumar 2024-02-16 03:12:06 +05:30
parent 13d165082b
commit 9684899b0f

View File

@ -26,7 +26,6 @@ import Control.Monad (when, void)
import Control.Monad.IO.Class (MonadIO(liftIO)) import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.Heap (Heap, Entry(..)) import Data.Heap (Heap, Entry(..))
import Data.IORef (IORef, newIORef, readIORef, atomicModifyIORef, writeIORef) import Data.IORef (IORef, newIORef, readIORef, atomicModifyIORef, writeIORef)
import Data.Kind (Type)
import GHC.Exts (inline) import GHC.Exts (inline)
import Streamly.Internal.Control.Concurrent import Streamly.Internal.Control.Concurrent
(MonadRunInIO, RunInIO(..), askRunInIO, restoreM) (MonadRunInIO, RunInIO(..), askRunInIO, restoreM)
@ -250,7 +249,7 @@ enqueueAhead sv q m = do
{-# INLINE dequeueAhead #-} {-# INLINE dequeueAhead #-}
dequeueAhead :: MonadIO m dequeueAhead :: MonadIO m
=> IORef ([t m a], Int) -> m (Maybe (t m a, Int)) => IORef ([K.StreamK m a], Int) -> m (Maybe (K.StreamK m a, Int))
dequeueAhead q = liftIO $ dequeueAhead q = liftIO $
atomicModifyIORefCAS q $ \case atomicModifyIORefCAS q $ \case
([], n) -> (([], n), Nothing) ([], n) -> (([], n), Nothing)
@ -259,7 +258,7 @@ dequeueAhead q = liftIO $
-- Dequeue only if the seq number matches the expected seq number. -- Dequeue only if the seq number matches the expected seq number.
{-# INLINE dequeueAheadSeqCheck #-} {-# INLINE dequeueAheadSeqCheck #-}
dequeueAheadSeqCheck :: MonadIO m dequeueAheadSeqCheck :: MonadIO m
=> IORef ([t m a], Int) -> Int -> m (Maybe (t m a)) => IORef ([K.StreamK m a], Int) -> Int -> m (Maybe (K.StreamK m a))
dequeueAheadSeqCheck q seqNo = liftIO $ dequeueAheadSeqCheck q seqNo = liftIO $
atomicModifyIORefCAS q $ \case atomicModifyIORefCAS q $ \case
([], n) -> (([], n), Nothing) ([], n) -> (([], n), Nothing)
@ -279,20 +278,20 @@ atomicModifyIORef_ :: IORef a -> (a -> a) -> IO ()
atomicModifyIORef_ ref f = atomicModifyIORef_ ref f =
atomicModifyIORef ref $ \x -> (f x, ()) atomicModifyIORef ref $ \x -> (f x, ())
data AheadHeapEntry (t :: (Type -> Type) -> Type -> Type) m a = data AheadHeapEntry m a =
AheadEntryNull AheadEntryNull
| AheadEntryPure a | AheadEntryPure a
| AheadEntryStream (RunInIO m, t m a) | AheadEntryStream (RunInIO m, K.StreamK m a)
data HeapDequeueResult t m a = data HeapDequeueResult m a =
Clearing Clearing
| Waiting Int | Waiting Int
| Ready (Entry Int (AheadHeapEntry t m a)) | Ready (Entry Int (AheadHeapEntry m a))
{-# INLINE dequeueFromHeap #-} {-# INLINE dequeueFromHeap #-}
dequeueFromHeap dequeueFromHeap
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) :: IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> IO (HeapDequeueResult t m a) -> IO (HeapDequeueResult m a)
dequeueFromHeap hpVar = dequeueFromHeap hpVar =
atomicModifyIORef hpVar $ \pair@(hp, snum) -> atomicModifyIORef hpVar $ \pair@(hp, snum) ->
case snum of case snum of
@ -308,9 +307,9 @@ dequeueFromHeap hpVar =
{-# INLINE dequeueFromHeapSeq #-} {-# INLINE dequeueFromHeapSeq #-}
dequeueFromHeapSeq dequeueFromHeapSeq
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) :: IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Int -> Int
-> IO (HeapDequeueResult t m a) -> IO (HeapDequeueResult m a)
dequeueFromHeapSeq hpVar i = dequeueFromHeapSeq hpVar i =
atomicModifyIORef hpVar $ \(hp, snum) -> atomicModifyIORef hpVar $ \(hp, snum) ->
case snum of case snum of
@ -332,8 +331,8 @@ heapIsSane snum seqNo =
{-# INLINE requeueOnHeapTop #-} {-# INLINE requeueOnHeapTop #-}
requeueOnHeapTop requeueOnHeapTop
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) :: IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Entry Int (AheadHeapEntry m a)
-> Int -> Int
-> IO () -> IO ()
requeueOnHeapTop hpVar ent seqNo = requeueOnHeapTop hpVar ent seqNo =
@ -342,7 +341,7 @@ requeueOnHeapTop hpVar ent seqNo =
{-# INLINE updateHeapSeq #-} {-# INLINE updateHeapSeq #-}
updateHeapSeq updateHeapSeq
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) :: IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Int -> Int
-> IO () -> IO ()
updateHeapSeq hpVar seqNo = updateHeapSeq hpVar seqNo =
@ -427,7 +426,7 @@ updateHeapSeq hpVar seqNo =
{-# INLINE underMaxHeap #-} {-# INLINE underMaxHeap #-}
underMaxHeap :: underMaxHeap ::
Channel m a Channel m a
-> Heap (Entry Int (AheadHeapEntry K.StreamK m a)) -> Heap (Entry Int (AheadHeapEntry m a))
-> IO Bool -> IO Bool
underMaxHeap sv hp = do underMaxHeap sv hp = do
(_, len) <- readIORef (outputQueue sv) (_, len) <- readIORef (outputQueue sv)
@ -449,7 +448,7 @@ underMaxHeap sv hp = do
-- False => continue -- False => continue
preStopCheck :: preStopCheck ::
Channel m a Channel m a
-> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)) , Maybe Int) -> IORef (Heap (Entry Int (AheadHeapEntry m a)) , Maybe Int)
-> IO Bool -> IO Bool
preStopCheck sv heap = preStopCheck sv heap =
-- check the stop condition under a lock before actually -- check the stop condition under a lock before actually
@ -502,10 +501,10 @@ abortExecution sv winfo = do
processHeap processHeap
:: MonadRunInIO m :: MonadRunInIO m
=> IORef ([K.StreamK m a], Int) => IORef ([K.StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int) -> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a -> Channel m a
-> Maybe WorkerInfo -> Maybe WorkerInfo
-> AheadHeapEntry K.StreamK m a -> AheadHeapEntry m a
-> Int -> Int
-> Bool -- we are draining the heap before we stop -> Bool -- we are draining the heap before we stop
-> m () -> m ()
@ -613,7 +612,7 @@ processHeap q heap sv winfo entry sno stopping = loopHeap sno entry
drainHeap drainHeap
:: MonadRunInIO m :: MonadRunInIO m
=> IORef ([K.StreamK m a], Int) => IORef ([K.StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int) -> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a -> Channel m a
-> Maybe WorkerInfo -> Maybe WorkerInfo
-> m () -> m ()
@ -629,7 +628,7 @@ data HeapStatus = HContinue | HStop
processWithoutToken processWithoutToken
:: MonadRunInIO m :: MonadRunInIO m
=> IORef ([K.StreamK m a], Int) => IORef ([K.StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int) -> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a -> Channel m a
-> Maybe WorkerInfo -> Maybe WorkerInfo
-> K.StreamK m a -> K.StreamK m a
@ -709,7 +708,7 @@ data TokenWorkerStatus = TokenContinue Int | TokenSuspend
processWithToken processWithToken
:: MonadRunInIO m :: MonadRunInIO m
=> IORef ([K.StreamK m a], Int) => IORef ([K.StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int) -> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a -> Channel m a
-> Maybe WorkerInfo -> Maybe WorkerInfo
-> K.StreamK m a -> K.StreamK m a
@ -818,7 +817,7 @@ processWithToken q heap sv winfo action sno = do
workLoopAhead workLoopAhead
:: MonadRunInIO m :: MonadRunInIO m
=> IORef ([K.StreamK m a], Int) => IORef ([K.StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int) -> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-> Channel m a -> Channel m a
-> Maybe WorkerInfo -> Maybe WorkerInfo
-> m () -> m ()