diff --git a/src/Streamly/Internal/Data/Stream/Channel/Append.hs b/src/Streamly/Internal/Data/Stream/Channel/Append.hs index d1d7806ca..0b9e13a56 100644 --- a/src/Streamly/Internal/Data/Stream/Channel/Append.hs +++ b/src/Streamly/Internal/Data/Stream/Channel/Append.hs @@ -26,7 +26,6 @@ import Control.Monad (when, void) import Control.Monad.IO.Class (MonadIO(liftIO)) import Data.Heap (Heap, Entry(..)) import Data.IORef (IORef, newIORef, readIORef, atomicModifyIORef, writeIORef) -import Data.Kind (Type) import GHC.Exts (inline) import Streamly.Internal.Control.Concurrent (MonadRunInIO, RunInIO(..), askRunInIO, restoreM) @@ -250,7 +249,7 @@ enqueueAhead sv q m = do {-# INLINE dequeueAhead #-} dequeueAhead :: MonadIO m - => IORef ([t m a], Int) -> m (Maybe (t m a, Int)) + => IORef ([K.StreamK m a], Int) -> m (Maybe (K.StreamK m a, Int)) dequeueAhead q = liftIO $ atomicModifyIORefCAS q $ \case ([], n) -> (([], n), Nothing) @@ -259,7 +258,7 @@ dequeueAhead q = liftIO $ -- Dequeue only if the seq number matches the expected seq number. {-# INLINE dequeueAheadSeqCheck #-} dequeueAheadSeqCheck :: MonadIO m - => IORef ([t m a], Int) -> Int -> m (Maybe (t m a)) + => IORef ([K.StreamK m a], Int) -> Int -> m (Maybe (K.StreamK m a)) dequeueAheadSeqCheck q seqNo = liftIO $ atomicModifyIORefCAS q $ \case ([], n) -> (([], n), Nothing) @@ -279,20 +278,20 @@ atomicModifyIORef_ :: IORef a -> (a -> a) -> IO () atomicModifyIORef_ ref f = atomicModifyIORef ref $ \x -> (f x, ()) -data AheadHeapEntry (t :: (Type -> Type) -> Type -> Type) m a = +data AheadHeapEntry m a = AheadEntryNull | AheadEntryPure a - | AheadEntryStream (RunInIO m, t m a) + | AheadEntryStream (RunInIO m, K.StreamK m a) -data HeapDequeueResult t m a = +data HeapDequeueResult m a = Clearing | Waiting Int - | Ready (Entry Int (AheadHeapEntry t m a)) + | Ready (Entry Int (AheadHeapEntry m a)) {-# INLINE dequeueFromHeap #-} dequeueFromHeap - :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) - -> IO (HeapDequeueResult t m a) + :: IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) + -> IO (HeapDequeueResult m a) dequeueFromHeap hpVar = atomicModifyIORef hpVar $ \pair@(hp, snum) -> case snum of @@ -308,9 +307,9 @@ dequeueFromHeap hpVar = {-# INLINE dequeueFromHeapSeq #-} dequeueFromHeapSeq - :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) + :: IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) -> Int - -> IO (HeapDequeueResult t m a) + -> IO (HeapDequeueResult m a) dequeueFromHeapSeq hpVar i = atomicModifyIORef hpVar $ \(hp, snum) -> case snum of @@ -332,8 +331,8 @@ heapIsSane snum seqNo = {-# INLINE requeueOnHeapTop #-} requeueOnHeapTop - :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) - -> Entry Int (AheadHeapEntry t m a) + :: IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) + -> Entry Int (AheadHeapEntry m a) -> Int -> IO () requeueOnHeapTop hpVar ent seqNo = @@ -342,7 +341,7 @@ requeueOnHeapTop hpVar ent seqNo = {-# INLINE updateHeapSeq #-} updateHeapSeq - :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) + :: IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) -> Int -> IO () updateHeapSeq hpVar seqNo = @@ -427,7 +426,7 @@ updateHeapSeq hpVar seqNo = {-# INLINE underMaxHeap #-} underMaxHeap :: Channel m a - -> Heap (Entry Int (AheadHeapEntry K.StreamK m a)) + -> Heap (Entry Int (AheadHeapEntry m a)) -> IO Bool underMaxHeap sv hp = do (_, len) <- readIORef (outputQueue sv) @@ -449,7 +448,7 @@ underMaxHeap sv hp = do -- False => continue preStopCheck :: Channel m a - -> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)) , Maybe Int) + -> IORef (Heap (Entry Int (AheadHeapEntry m a)) , Maybe Int) -> IO Bool preStopCheck sv heap = -- check the stop condition under a lock before actually @@ -502,10 +501,10 @@ abortExecution sv winfo = do processHeap :: MonadRunInIO m => IORef ([K.StreamK m a], Int) - -> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int) + -> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) -> Channel m a -> Maybe WorkerInfo - -> AheadHeapEntry K.StreamK m a + -> AheadHeapEntry m a -> Int -> Bool -- we are draining the heap before we stop -> m () @@ -613,7 +612,7 @@ processHeap q heap sv winfo entry sno stopping = loopHeap sno entry drainHeap :: MonadRunInIO m => IORef ([K.StreamK m a], Int) - -> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int) + -> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) -> Channel m a -> Maybe WorkerInfo -> m () @@ -629,7 +628,7 @@ data HeapStatus = HContinue | HStop processWithoutToken :: MonadRunInIO m => IORef ([K.StreamK m a], Int) - -> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int) + -> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) -> Channel m a -> Maybe WorkerInfo -> K.StreamK m a @@ -709,7 +708,7 @@ data TokenWorkerStatus = TokenContinue Int | TokenSuspend processWithToken :: MonadRunInIO m => IORef ([K.StreamK m a], Int) - -> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int) + -> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) -> Channel m a -> Maybe WorkerInfo -> K.StreamK m a @@ -818,7 +817,7 @@ processWithToken q heap sv winfo action sno = do workLoopAhead :: MonadRunInIO m => IORef ([K.StreamK m a], Int) - -> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int) + -> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) -> Channel m a -> Maybe WorkerInfo -> m ()